Skip to content

Commit 000a39b

Browse files
authored
HADOOP-18872: [ABFS] [BugFix] Misreporting Retry Count for Sub-sequential and Parallel Operations (#6019)
Contributed by Anuj Modi
1 parent 342e6ca commit 000a39b

File tree

9 files changed

+502
-193
lines changed

9 files changed

+502
-193
lines changed

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsClient.java

Lines changed: 115 additions & 86 deletions
Large diffs are not rendered by default.

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsRestOperation.java

Lines changed: 31 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,11 @@ public class AbfsRestOperation {
8282
*/
8383
private String failureReason;
8484

85+
/**
86+
* This variable stores the tracing context used for last Rest Operation.
87+
*/
88+
private TracingContext lastUsedTracingContext;
89+
8590
/**
8691
* Checks if there is non-null HTTP response.
8792
* @return true if there is a non-null HTTP response from the ABFS call.
@@ -197,10 +202,13 @@ String getSasToken() {
197202
public void execute(TracingContext tracingContext)
198203
throws AzureBlobFileSystemException {
199204

205+
// Since this might be a sub-sequential or parallel rest operation
206+
// triggered by a single file system call, using a new tracing context.
207+
lastUsedTracingContext = createNewTracingContext(tracingContext);
200208
try {
201209
IOStatisticsBinding.trackDurationOfInvocation(abfsCounters,
202210
AbfsStatistic.getStatNameFromHttpCall(method),
203-
() -> completeExecute(tracingContext));
211+
() -> completeExecute(lastUsedTracingContext));
204212
} catch (AzureBlobFileSystemException aze) {
205213
throw aze;
206214
} catch (IOException e) {
@@ -214,7 +222,7 @@ public void execute(TracingContext tracingContext)
214222
* HTTP operations.
215223
* @param tracingContext TracingContext instance to track correlation IDs
216224
*/
217-
private void completeExecute(TracingContext tracingContext)
225+
void completeExecute(TracingContext tracingContext)
218226
throws AzureBlobFileSystemException {
219227
// see if we have latency reports from the previous requests
220228
String latencyHeader = getClientLatency();
@@ -409,4 +417,25 @@ private void incrementCounter(AbfsStatistic statistic, long value) {
409417
abfsCounters.incrementCounter(statistic, value);
410418
}
411419
}
420+
421+
/**
422+
* Creates a new Tracing context before entering the retry loop of a rest operation.
423+
* This will ensure all rest operations have unique
424+
* tracing context that will be used for all the retries.
425+
* @param tracingContext original tracingContext.
426+
* @return tracingContext new tracingContext object created from original one.
427+
*/
428+
@VisibleForTesting
429+
public TracingContext createNewTracingContext(final TracingContext tracingContext) {
430+
return new TracingContext(tracingContext);
431+
}
432+
433+
/**
434+
* Returns the tracing contest used for last rest operation made.
435+
* @return tracingContext lasUserTracingContext.
436+
*/
437+
@VisibleForTesting
438+
public final TracingContext getLastTracingContext() {
439+
return lastUsedTracingContext;
440+
}
412441
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemDelete.java

Lines changed: 28 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -29,10 +29,14 @@
2929
import org.assertj.core.api.Assertions;
3030
import org.junit.Assume;
3131
import org.junit.Test;
32+
import org.mockito.Mockito;
3233

34+
import org.apache.hadoop.conf.Configuration;
35+
import org.apache.hadoop.fs.FileSystem;
3336
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
3437
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
3538
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
39+
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
3640
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
3741
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
3842
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
@@ -61,7 +65,6 @@
6165
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
6266
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
6367

64-
6568
/**
6669
* Test delete operation.
6770
*/
@@ -257,14 +260,15 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
257260

258261
// Case 2: Mimic retried case
259262
// Idempotency check on Delete always returns success
260-
AbfsRestOperation idempotencyRetOp = ITestAbfsClient.getRestOp(
263+
AbfsRestOperation idempotencyRetOp = Mockito.spy(ITestAbfsClient.getRestOp(
261264
DeletePath, mockClient, HTTP_METHOD_DELETE,
262265
ITestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"),
263-
ITestAbfsClient.getTestRequestHeaders(mockClient));
266+
ITestAbfsClient.getTestRequestHeaders(mockClient)));
264267
idempotencyRetOp.hardSetResult(HTTP_OK);
265268

266269
doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
267270
TracingContext tracingContext = getTestTracingContext(fs, false);
271+
doReturn(tracingContext).when(idempotencyRetOp).createNewTracingContext(any());
268272
when(mockClient.deletePath("/NonExistingPath", false, null, tracingContext))
269273
.thenCallRealMethod();
270274

@@ -283,4 +287,25 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
283287
mockStore.delete(new Path("/NonExistingPath"), false, getTestTracingContext(fs, false));
284288
}
285289

290+
@Test
291+
public void deleteBlobDirParallelThreadToDeleteOnDifferentTracingContext()
292+
throws Exception {
293+
Configuration configuration = getRawConfiguration();
294+
AzureBlobFileSystem fs = Mockito.spy(
295+
(AzureBlobFileSystem) FileSystem.newInstance(configuration));
296+
AzureBlobFileSystemStore spiedStore = Mockito.spy(fs.getAbfsStore());
297+
AbfsClient spiedClient = Mockito.spy(fs.getAbfsClient());
298+
299+
Mockito.doReturn(spiedStore).when(fs).getAbfsStore();
300+
spiedStore.setClient(spiedClient);
301+
302+
fs.mkdirs(new Path("/testDir"));
303+
fs.create(new Path("/testDir/file1"));
304+
fs.create(new Path("/testDir/file2"));
305+
306+
AbfsClientTestUtil.hookOnRestOpsForTracingContextSingularity(spiedClient);
307+
308+
fs.delete(new Path("/testDir"), true);
309+
fs.close();
310+
}
286311
}

hadoop-tools/hadoop-azure/src/test/java/org/apache/hadoop/fs/azurebfs/ITestAzureBlobFileSystemListStatus.java

Lines changed: 115 additions & 26 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020

2121
import java.io.FileNotFoundException;
2222
import java.io.IOException;
23+
import java.net.SocketTimeoutException;
2324
import java.util.ArrayList;
2425
import java.util.List;
2526
import java.util.concurrent.Callable;
@@ -28,6 +29,8 @@
2829
import java.util.concurrent.Future;
2930

3031
import org.junit.Test;
32+
import org.mockito.Mockito;
33+
import org.mockito.stubbing.Stubber;
3134

3235
import org.apache.hadoop.conf.Configuration;
3336
import org.apache.hadoop.fs.FileSystem;
@@ -36,23 +39,38 @@
3639
import org.apache.hadoop.fs.LocatedFileStatus;
3740
import org.apache.hadoop.fs.Path;
3841
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
42+
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
43+
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
44+
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
45+
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
46+
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
47+
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
48+
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
3949
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
4050
import org.apache.hadoop.fs.contract.ContractTestUtils;
4151

52+
import static java.net.HttpURLConnection.HTTP_OK;
53+
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
4254
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_LIST_MAX_RESULTS;
55+
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_JDK_MESSAGE;
4356
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
4457
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
4558
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
4659
import static org.apache.hadoop.fs.contract.ContractTestUtils.rename;
4760

4861
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
62+
import static org.mockito.ArgumentMatchers.any;
63+
import static org.mockito.ArgumentMatchers.nullable;
64+
import static org.mockito.Mockito.times;
65+
import static org.mockito.Mockito.when;
4966

5067
/**
5168
* Test listStatus operation.
5269
*/
5370
public class ITestAzureBlobFileSystemListStatus extends
5471
AbstractAbfsIntegrationTest {
5572
private static final int TEST_FILES_NUMBER = 6000;
73+
private static final String TEST_CONTINUATION_TOKEN = "continuation";
5674

5775
public ITestAzureBlobFileSystemListStatus() throws Exception {
5876
super();
@@ -62,34 +80,105 @@ public ITestAzureBlobFileSystemListStatus() throws Exception {
6280
public void testListPath() throws Exception {
6381
Configuration config = new Configuration(this.getRawConfiguration());
6482
config.set(AZURE_LIST_MAX_RESULTS, "5000");
65-
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
66-
.newInstance(getFileSystem().getUri(), config);
67-
final List<Future<Void>> tasks = new ArrayList<>();
68-
69-
ExecutorService es = Executors.newFixedThreadPool(10);
70-
for (int i = 0; i < TEST_FILES_NUMBER; i++) {
71-
final Path fileName = new Path("/test" + i);
72-
Callable<Void> callable = new Callable<Void>() {
73-
@Override
74-
public Void call() throws Exception {
75-
touch(fileName);
76-
return null;
77-
}
78-
};
79-
80-
tasks.add(es.submit(callable));
81-
}
82-
83-
for (Future<Void> task : tasks) {
84-
task.get();
83+
try (final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
84+
.newInstance(getFileSystem().getUri(), config)) {
85+
final List<Future<Void>> tasks = new ArrayList<>();
86+
87+
ExecutorService es = Executors.newFixedThreadPool(10);
88+
for (int i = 0; i < TEST_FILES_NUMBER; i++) {
89+
final Path fileName = new Path("/test" + i);
90+
Callable<Void> callable = new Callable<Void>() {
91+
@Override
92+
public Void call() throws Exception {
93+
touch(fileName);
94+
return null;
95+
}
96+
};
97+
98+
tasks.add(es.submit(callable));
99+
}
100+
101+
for (Future<Void> task : tasks) {
102+
task.get();
103+
}
104+
105+
es.shutdownNow();
106+
fs.registerListener(
107+
new TracingHeaderValidator(getConfiguration().getClientCorrelationId(),
108+
fs.getFileSystemId(), FSOperationType.LISTSTATUS, true, 0));
109+
FileStatus[] files = fs.listStatus(new Path("/"));
110+
assertEquals(TEST_FILES_NUMBER, files.length /* user directory */);
85111
}
112+
}
86113

87-
es.shutdownNow();
88-
fs.registerListener(
89-
new TracingHeaderValidator(getConfiguration().getClientCorrelationId(),
90-
fs.getFileSystemId(), FSOperationType.LISTSTATUS, true, 0));
91-
FileStatus[] files = fs.listStatus(new Path("/"));
92-
assertEquals(TEST_FILES_NUMBER, files.length /* user directory */);
114+
/**
115+
* Test to verify that each paginated call to ListBlobs uses a new tracing context.
116+
* @throws Exception
117+
*/
118+
@Test
119+
public void testListPathTracingContext() throws Exception {
120+
final AzureBlobFileSystem fs = getFileSystem();
121+
final AzureBlobFileSystem spiedFs = Mockito.spy(fs);
122+
final AzureBlobFileSystemStore spiedStore = Mockito.spy(fs.getAbfsStore());
123+
final AbfsClient spiedClient = Mockito.spy(fs.getAbfsClient());
124+
final TracingContext spiedTracingContext = Mockito.spy(
125+
new TracingContext(
126+
fs.getClientCorrelationId(), fs.getFileSystemId(),
127+
FSOperationType.LISTSTATUS, true, TracingHeaderFormat.ALL_ID_FORMAT, null));
128+
129+
Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
130+
spiedStore.setClient(spiedClient);
131+
spiedFs.setWorkingDirectory(new Path("/"));
132+
133+
AbfsClientTestUtil.setMockAbfsRestOperationForListPathOperation(spiedClient,
134+
(httpOperation) -> {
135+
136+
ListResultEntrySchema entry = new ListResultEntrySchema()
137+
.withName("a")
138+
.withIsDirectory(true);
139+
List<ListResultEntrySchema> paths = new ArrayList<>();
140+
paths.add(entry);
141+
paths.clear();
142+
entry = new ListResultEntrySchema()
143+
.withName("abc.txt")
144+
.withIsDirectory(false);
145+
paths.add(entry);
146+
ListResultSchema schema1 = new ListResultSchema().withPaths(paths);
147+
ListResultSchema schema2 = new ListResultSchema().withPaths(paths);
148+
149+
when(httpOperation.getListResultSchema()).thenReturn(schema1)
150+
.thenReturn(schema2);
151+
when(httpOperation.getResponseHeader(
152+
HttpHeaderConfigurations.X_MS_CONTINUATION))
153+
.thenReturn(TEST_CONTINUATION_TOKEN)
154+
.thenReturn(EMPTY_STRING);
155+
156+
Stubber stubber = Mockito.doThrow(
157+
new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE));
158+
stubber.doNothing().when(httpOperation).processResponse(
159+
nullable(byte[].class), nullable(int.class), nullable(int.class));
160+
161+
when(httpOperation.getStatusCode()).thenReturn(-1).thenReturn(HTTP_OK);
162+
return httpOperation;
163+
});
164+
165+
List<FileStatus> fileStatuses = new ArrayList<>();
166+
spiedStore.listStatus(new Path("/"), "", fileStatuses, true, null, spiedTracingContext);
167+
168+
// Assert that there were 2 paginated ListPath calls were made 1 and 2.
169+
// 1. Without continuation token
170+
Mockito.verify(spiedClient, times(1)).listPath(
171+
"/", false,
172+
spiedFs.getAbfsStore().getAbfsConfiguration().getListMaxResults(),
173+
null, spiedTracingContext);
174+
// 2. With continuation token
175+
Mockito.verify(spiedClient, times(1)).listPath(
176+
"/", false,
177+
spiedFs.getAbfsStore().getAbfsConfiguration().getListMaxResults(),
178+
TEST_CONTINUATION_TOKEN, spiedTracingContext);
179+
180+
// Assert that none of the API calls used the same tracing header.
181+
Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any());
93182
}
94183

95184
/**

0 commit comments

Comments
 (0)