Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,11 @@ public class AbfsRestOperation {
*/
private String failureReason;

/**
* This variable stores the tracing context used for last Rest Operation.
*/
private TracingContext lastUsedTracingContext;

/**
* Checks if there is non-null HTTP response.
* @return true if there is a non-null HTTP response from the ABFS call.
Expand Down Expand Up @@ -197,10 +202,13 @@ String getSasToken() {
public void execute(TracingContext tracingContext)
throws AzureBlobFileSystemException {

// Since this might be a sub-sequential or parallel rest operation
// triggered by a single file system call, using a new tracing context.
lastUsedTracingContext = createNewTracingContext(tracingContext);
try {
IOStatisticsBinding.trackDurationOfInvocation(abfsCounters,
AbfsStatistic.getStatNameFromHttpCall(method),
() -> completeExecute(tracingContext));
() -> completeExecute(lastUsedTracingContext));
} catch (AzureBlobFileSystemException aze) {
throw aze;
} catch (IOException e) {
Expand All @@ -214,7 +222,7 @@ public void execute(TracingContext tracingContext)
* HTTP operations.
* @param tracingContext TracingContext instance to track correlation IDs
*/
private void completeExecute(TracingContext tracingContext)
void completeExecute(TracingContext tracingContext)
throws AzureBlobFileSystemException {
// see if we have latency reports from the previous requests
String latencyHeader = getClientLatency();
Expand Down Expand Up @@ -409,4 +417,25 @@ private void incrementCounter(AbfsStatistic statistic, long value) {
abfsCounters.incrementCounter(statistic, value);
}
}

/**
* Creates a new Tracing context before entering the retry loop of a rest operation.
* This will ensure all rest operations have unique
* tracing context that will be used for all the retries.
* @param tracingContext original tracingContext.
* @return tracingContext new tracingContext object created from original one.
*/
@VisibleForTesting
public TracingContext createNewTracingContext(final TracingContext tracingContext) {
return new TracingContext(tracingContext);
}

/**
* Returns the tracing contest used for last rest operation made.
* @return tracingContext lasUserTracingContext.
*/
@VisibleForTesting
public final TracingContext getLastTracingContext() {
return lastUsedTracingContext;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,14 @@
import org.assertj.core.api.Assertions;
import org.junit.Assume;
import org.junit.Test;
import org.mockito.Mockito;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.services.AbfsHttpOperation;
import org.apache.hadoop.fs.azurebfs.services.AbfsRestOperation;
import org.apache.hadoop.fs.azurebfs.services.ITestAbfsClient;
Expand Down Expand Up @@ -61,7 +65,6 @@
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathDoesNotExist;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;


/**
* Test delete operation.
*/
Expand Down Expand Up @@ -257,14 +260,15 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {

// Case 2: Mimic retried case
// Idempotency check on Delete always returns success
AbfsRestOperation idempotencyRetOp = ITestAbfsClient.getRestOp(
AbfsRestOperation idempotencyRetOp = Mockito.spy(ITestAbfsClient.getRestOp(
DeletePath, mockClient, HTTP_METHOD_DELETE,
ITestAbfsClient.getTestUrl(mockClient, "/NonExistingPath"),
ITestAbfsClient.getTestRequestHeaders(mockClient));
ITestAbfsClient.getTestRequestHeaders(mockClient)));
idempotencyRetOp.hardSetResult(HTTP_OK);

doReturn(idempotencyRetOp).when(mockClient).deleteIdempotencyCheckOp(any());
TracingContext tracingContext = getTestTracingContext(fs, false);
doReturn(tracingContext).when(idempotencyRetOp).createNewTracingContext(any());
when(mockClient.deletePath("/NonExistingPath", false, null, tracingContext))
.thenCallRealMethod();

Expand All @@ -283,4 +287,25 @@ public void testDeleteIdempotencyTriggerHttp404() throws Exception {
mockStore.delete(new Path("/NonExistingPath"), false, getTestTracingContext(fs, false));
}

@Test
public void deleteBlobDirParallelThreadToDeleteOnDifferentTracingContext()
throws Exception {
Configuration configuration = getRawConfiguration();
AzureBlobFileSystem fs = Mockito.spy(
(AzureBlobFileSystem) FileSystem.newInstance(configuration));
AzureBlobFileSystemStore spiedStore = Mockito.spy(fs.getAbfsStore());
AbfsClient spiedClient = Mockito.spy(fs.getAbfsClient());

Mockito.doReturn(spiedStore).when(fs).getAbfsStore();
spiedStore.setClient(spiedClient);

fs.mkdirs(new Path("/testDir"));
fs.create(new Path("/testDir/file1"));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

add .close() or is mockito so involved these are no-ops?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added fs.close()

fs.create(new Path("/testDir/file2"));

AbfsClientTestUtil.hookOnRestOpsForTracingContextSingularity(spiedClient);

fs.delete(new Path("/testDir"), true);
fs.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import java.io.FileNotFoundException;
import java.io.IOException;
import java.net.SocketTimeoutException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Callable;
Expand All @@ -28,6 +29,8 @@
import java.util.concurrent.Future;

import org.junit.Test;
import org.mockito.Mockito;
import org.mockito.stubbing.Stubber;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
Expand All @@ -36,23 +39,38 @@
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.azurebfs.constants.FSOperationType;
import org.apache.hadoop.fs.azurebfs.constants.HttpHeaderConfigurations;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultEntrySchema;
import org.apache.hadoop.fs.azurebfs.contracts.services.ListResultSchema;
import org.apache.hadoop.fs.azurebfs.services.AbfsClient;
import org.apache.hadoop.fs.azurebfs.services.AbfsClientTestUtil;
import org.apache.hadoop.fs.azurebfs.utils.TracingContext;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderFormat;
import org.apache.hadoop.fs.azurebfs.utils.TracingHeaderValidator;
import org.apache.hadoop.fs.contract.ContractTestUtils;

import static java.net.HttpURLConnection.HTTP_OK;
import static org.apache.hadoop.fs.azurebfs.constants.AbfsHttpConstants.EMPTY_STRING;
import static org.apache.hadoop.fs.azurebfs.constants.ConfigurationKeys.AZURE_LIST_MAX_RESULTS;
import static org.apache.hadoop.fs.azurebfs.services.RetryReasonConstants.CONNECTION_TIMEOUT_JDK_MESSAGE;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertMkdirs;
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
import static org.apache.hadoop.fs.contract.ContractTestUtils.assertPathExists;
import static org.apache.hadoop.fs.contract.ContractTestUtils.rename;

import static org.apache.hadoop.test.LambdaTestUtils.intercept;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.nullable;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.when;

/**
* Test listStatus operation.
*/
public class ITestAzureBlobFileSystemListStatus extends
AbstractAbfsIntegrationTest {
private static final int TEST_FILES_NUMBER = 6000;
private static final String TEST_CONTINUATION_TOKEN = "continuation";

public ITestAzureBlobFileSystemListStatus() throws Exception {
super();
Expand All @@ -62,34 +80,105 @@ public ITestAzureBlobFileSystemListStatus() throws Exception {
public void testListPath() throws Exception {
Configuration config = new Configuration(this.getRawConfiguration());
config.set(AZURE_LIST_MAX_RESULTS, "5000");
final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
.newInstance(getFileSystem().getUri(), config);
final List<Future<Void>> tasks = new ArrayList<>();

ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < TEST_FILES_NUMBER; i++) {
final Path fileName = new Path("/test" + i);
Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws Exception {
touch(fileName);
return null;
}
};

tasks.add(es.submit(callable));
}

for (Future<Void> task : tasks) {
task.get();
try (final AzureBlobFileSystem fs = (AzureBlobFileSystem) FileSystem
.newInstance(getFileSystem().getUri(), config)) {
final List<Future<Void>> tasks = new ArrayList<>();

ExecutorService es = Executors.newFixedThreadPool(10);
for (int i = 0; i < TEST_FILES_NUMBER; i++) {
final Path fileName = new Path("/test" + i);
Callable<Void> callable = new Callable<Void>() {
@Override
public Void call() throws Exception {
touch(fileName);
return null;
}
};

tasks.add(es.submit(callable));
}

for (Future<Void> task : tasks) {
task.get();
}

es.shutdownNow();
fs.registerListener(
new TracingHeaderValidator(getConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.LISTSTATUS, true, 0));
FileStatus[] files = fs.listStatus(new Path("/"));
assertEquals(TEST_FILES_NUMBER, files.length /* user directory */);
}
}

es.shutdownNow();
fs.registerListener(
new TracingHeaderValidator(getConfiguration().getClientCorrelationId(),
fs.getFileSystemId(), FSOperationType.LISTSTATUS, true, 0));
FileStatus[] files = fs.listStatus(new Path("/"));
assertEquals(TEST_FILES_NUMBER, files.length /* user directory */);
/**
* Test to verify that each paginated call to ListBlobs uses a new tracing context.
* @throws Exception
*/
@Test
public void testListPathTracingContext() throws Exception {
final AzureBlobFileSystem fs = getFileSystem();
final AzureBlobFileSystem spiedFs = Mockito.spy(fs);
final AzureBlobFileSystemStore spiedStore = Mockito.spy(fs.getAbfsStore());
final AbfsClient spiedClient = Mockito.spy(fs.getAbfsClient());
final TracingContext spiedTracingContext = Mockito.spy(
new TracingContext(
fs.getClientCorrelationId(), fs.getFileSystemId(),
FSOperationType.LISTSTATUS, true, TracingHeaderFormat.ALL_ID_FORMAT, null));

Mockito.doReturn(spiedStore).when(spiedFs).getAbfsStore();
spiedStore.setClient(spiedClient);
spiedFs.setWorkingDirectory(new Path("/"));

AbfsClientTestUtil.setMockAbfsRestOperationForListPathOperation(spiedClient,
(httpOperation) -> {

ListResultEntrySchema entry = new ListResultEntrySchema()
.withName("a")
.withIsDirectory(true);
List<ListResultEntrySchema> paths = new ArrayList<>();
paths.add(entry);
paths.clear();
entry = new ListResultEntrySchema()
.withName("abc.txt")
.withIsDirectory(false);
paths.add(entry);
ListResultSchema schema1 = new ListResultSchema().withPaths(paths);
ListResultSchema schema2 = new ListResultSchema().withPaths(paths);

when(httpOperation.getListResultSchema()).thenReturn(schema1)
.thenReturn(schema2);
when(httpOperation.getResponseHeader(
HttpHeaderConfigurations.X_MS_CONTINUATION))
.thenReturn(TEST_CONTINUATION_TOKEN)
.thenReturn(EMPTY_STRING);

Stubber stubber = Mockito.doThrow(
new SocketTimeoutException(CONNECTION_TIMEOUT_JDK_MESSAGE));
stubber.doNothing().when(httpOperation).processResponse(
nullable(byte[].class), nullable(int.class), nullable(int.class));

when(httpOperation.getStatusCode()).thenReturn(-1).thenReturn(HTTP_OK);
return httpOperation;
});

List<FileStatus> fileStatuses = new ArrayList<>();
spiedStore.listStatus(new Path("/"), "", fileStatuses, true, null, spiedTracingContext);

// Assert that there were 2 paginated ListPath calls were made 1 and 2.
// 1. Without continuation token
Mockito.verify(spiedClient, times(1)).listPath(
"/", false,
spiedFs.getAbfsStore().getAbfsConfiguration().getListMaxResults(),
null, spiedTracingContext);
// 2. With continuation token
Mockito.verify(spiedClient, times(1)).listPath(
"/", false,
spiedFs.getAbfsStore().getAbfsConfiguration().getListMaxResults(),
TEST_CONTINUATION_TOKEN, spiedTracingContext);

// Assert that none of the API calls used the same tracing header.
Mockito.verify(spiedTracingContext, times(0)).constructHeader(any(), any());
}

/**
Expand Down
Loading