Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ public S3AsyncClient createS3AsyncClient(
return configureClientBuilder(S3AsyncClient.builder(), parameters, conf, bucket)
.httpClientBuilder(httpClientBuilder)
.multipartConfiguration(multipartConfiguration)
.multipartEnabled(true)
.multipartEnabled(parameters.isMultipartCopy())
.build();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -440,6 +440,12 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
*/
private boolean isMultipartUploadEnabled = DEFAULT_MULTIPART_UPLOAD_ENABLED;

/**
* Should file copy operations use the S3 transfer manager?
* True unless multipart upload is disabled.
*/
private boolean isMultipartCopyEnabled;

/**
* A cache of files that should be deleted when the FileSystem is closed
* or the JVM is exited.
Expand Down Expand Up @@ -576,6 +582,9 @@ public void initialize(URI name, Configuration originalConf)
intOption(conf, PREFETCH_BLOCK_COUNT_KEY, PREFETCH_BLOCK_DEFAULT_COUNT, 1);
this.isMultipartUploadEnabled = conf.getBoolean(MULTIPART_UPLOADS_ENABLED,
DEFAULT_MULTIPART_UPLOAD_ENABLED);
// multipart copy and upload are the same; this just makes it explicit
this.isMultipartCopyEnabled = isMultipartUploadEnabled;

initThreadPools(conf);

int listVersion = conf.getInt(LIST_VERSION, DEFAULT_LIST_VERSION);
Expand Down Expand Up @@ -982,6 +991,7 @@ private void bindAWSClient(URI name, boolean dtEnabled) throws IOException {
.withRequesterPays(conf.getBoolean(ALLOW_REQUESTER_PAYS, DEFAULT_ALLOW_REQUESTER_PAYS))
.withExecutionInterceptors(auditManager.createExecutionInterceptors())
.withMinimumPartSize(partSize)
.withMultipartCopyEnabled(isMultipartCopyEnabled)
.withMultipartThreshold(multiPartThreshold)
.withTransferManagerExecutor(unboundedThreadPool)
.withRegion(region);
Expand Down Expand Up @@ -1468,6 +1478,11 @@ public AWSCredentialProviderList shareCredentials(final String purpose) {
LOG.debug("Sharing credentials for: {}", purpose);
return credentials.share();
}

@Override
public boolean isMultipartCopyEnabled() {
return S3AFileSystem.this.isMultipartUploadEnabled;
}
}

/**
Expand Down Expand Up @@ -4436,37 +4451,56 @@ private CopyObjectResponse copyFile(String srcKey, String dstKey, long size,
e);
}

return readInvoker.retry(
action, srcKey,
true,
() -> {
CopyObjectRequest.Builder copyObjectRequestBuilder =
getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom);
changeTracker.maybeApplyConstraint(copyObjectRequestBuilder);
incrementStatistic(OBJECT_COPY_REQUESTS);

Copy copy = transferManager.copy(
CopyRequest.builder()
.copyObjectRequest(copyObjectRequestBuilder.build())
.build());
CopyObjectRequest.Builder copyObjectRequestBuilder =
getRequestFactory().newCopyObjectRequestBuilder(srcKey, dstKey, srcom);
changeTracker.maybeApplyConstraint(copyObjectRequestBuilder);
CopyObjectResponse response;

try {
CompletedCopy completedCopy = copy.completionFuture().join();
CopyObjectResponse result = completedCopy.response();
changeTracker.processResponse(result);
incrementWriteOperations();
instrumentation.filesCopied(1, size);
return result;
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof SdkException) {
SdkException awsException = (SdkException)cause;
changeTracker.processException(awsException, "copy");
throw awsException;
// transfer manager is skipped if disabled or the file is too small to worry about
final boolean useTransferManager = isMultipartCopyEnabled && size >= multiPartThreshold;
if (useTransferManager) {
// use transfer manager
response = readInvoker.retry(
action, srcKey,
true,
() -> {
incrementStatistic(OBJECT_COPY_REQUESTS);

Copy copy = transferManager.copy(
CopyRequest.builder()
.copyObjectRequest(copyObjectRequestBuilder.build())
.build());

try {
CompletedCopy completedCopy = copy.completionFuture().join();
return completedCopy.response();
} catch (CompletionException e) {
Throwable cause = e.getCause();
if (cause instanceof SdkException) {
SdkException awsException = (SdkException)cause;
changeTracker.processException(awsException, "copy");
throw awsException;
}
throw extractException(action, srcKey, e);
}
throw extractException(action, srcKey, e);
}
});
});
} else {
// single part copy bypasses transfer manager
// note, this helps with some mock testing, e.g. HBoss. as there is less to mock.
response = readInvoker.retry(
action, srcKey,
true,
() -> {
LOG.debug("copyFile: single part copy {} -> {} of size {}", srcKey, dstKey, size);
incrementStatistic(OBJECT_COPY_REQUESTS);
return s3Client.copyObject(copyObjectRequestBuilder.build());
});
}

changeTracker.processResponse(response);
incrementWriteOperations();
instrumentation.filesCopied(1, size);
return response;
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,4 +115,10 @@ public interface S3AInternals {
@AuditEntryPoint
@Retries.RetryTranslated
HeadBucketResponse getBucketMetadata() throws IOException;

/**
* Is multipart copy enabled?
* @return true if the transfer manager is used to copy files.
*/
boolean isMultipartCopyEnabled();
}
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,11 @@ final class S3ClientCreationParameters {
*/
private long multiPartThreshold;

/**
* Multipart upload enabled.
*/
private boolean multipartCopy = true;

/**
* Executor that the transfer manager will use to execute background tasks.
*/
Expand Down Expand Up @@ -399,5 +404,24 @@ public S3ClientCreationParameters withRegion(
public Region getRegion() {
return region;
}

/**
* Set the multipart flag..
*
* @param value new value
* @return the builder
*/
public S3ClientCreationParameters withMultipartCopyEnabled(final boolean value) {
this.multipartCopy = value;
return this;
}

/**
* Get the multipart flag.
* @return multipart flag
*/
public boolean isMultipartCopy() {
return multipartCopy;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -355,10 +355,10 @@ public long getFilesize() {
/**
* Is this expected to be a multipart upload?
* Assertions will change if not.
* @return true by default.
* @return what the filesystem expects.
*/
protected boolean expectMultipartUpload() {
return true;
return getFileSystem().getS3AInternals().isMultipartCopyEnabled();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,10 @@

package org.apache.hadoop.fs.s3a.scale;

import org.assertj.core.api.Assertions;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.s3a.Constants;
import org.apache.hadoop.fs.s3a.S3AFileSystem;
import org.apache.hadoop.fs.s3a.S3ATestUtils;
import org.apache.hadoop.fs.s3a.api.UnsupportedRequestException;

import static org.apache.hadoop.fs.contract.ContractTestUtils.IO_CHUNK_BUFFER_SIZE;
import static org.apache.hadoop.fs.s3a.Constants.MIN_MULTIPART_THRESHOLD;
Expand All @@ -33,19 +30,13 @@
import static org.apache.hadoop.fs.s3a.Constants.MULTIPART_UPLOADS_ENABLED;
import static org.apache.hadoop.fs.s3a.Constants.REQUEST_TIMEOUT;
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
import static org.apache.hadoop.test.LambdaTestUtils.intercept;

/**
* Use a single PUT for the whole upload/rename/delete workflow; include verification
* that the transfer manager will fail fast unless the multipart threshold is huge.
*/
public class ITestS3AHugeFilesNoMultipart extends AbstractSTestS3AHugeFiles {

/**
* Size to ensure MPUs don't happen in transfer manager.
*/
public static final String S_1T = "1T";

public static final String SINGLE_PUT_REQUEST_TIMEOUT = "1h";

/**
Expand All @@ -56,11 +47,23 @@ protected String getBlockOutputBufferName() {
return Constants.FAST_UPLOAD_BUFFER_DISK;
}

/**
* Multipart upload is always disabled.
* @return false
*/
@Override
protected boolean expectMultipartUpload() {
return false;
}

/**
* Is multipart copy enabled?
* @return true if the transfer manager is used to copy files.
*/
private boolean isMultipartCopyEnabled() {
return getFileSystem().getS3AInternals().isMultipartCopyEnabled();
}

/**
* Create a configuration without multipart upload,
* and a long request timeout to allow for a very slow
Expand All @@ -77,35 +80,21 @@ protected Configuration createScaleConfiguration() {
MULTIPART_SIZE,
REQUEST_TIMEOUT);
conf.setInt(IO_CHUNK_BUFFER_SIZE, 655360);
conf.set(MIN_MULTIPART_THRESHOLD, S_1T);
conf.set(MULTIPART_SIZE, S_1T);
conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
conf.setBoolean(MULTIPART_UPLOADS_ENABLED, false);
conf.set(REQUEST_TIMEOUT, SINGLE_PUT_REQUEST_TIMEOUT);
return conf;
}

/**
* After the file is created, attempt a rename with an FS
* instance with a small multipart threshold;
* this MUST be rejected.
* Verify multipart copy is disabled.
*/
@Override
public void test_030_postCreationAssertions() throws Throwable {
assumeHugeFileExists();
final Path hugefile = getHugefile();
final Path hugefileRenamed = getHugefileRenamed();
describe("renaming %s to %s", hugefile, hugefileRenamed);
S3AFileSystem fs = getFileSystem();
fs.delete(hugefileRenamed, false);
// create a new fs with a small multipart threshold; expect rename failure.
final Configuration conf = new Configuration(fs.getConf());
conf.setInt(MIN_MULTIPART_THRESHOLD, MULTIPART_MIN_SIZE);
conf.setInt(MULTIPART_SIZE, MULTIPART_MIN_SIZE);
S3ATestUtils.disableFilesystemCaching(conf);

try (FileSystem fs2 = FileSystem.get(fs.getUri(), conf)) {
intercept(UnsupportedRequestException.class, () ->
fs2.rename(hugefile, hugefileRenamed));
}
super.test_030_postCreationAssertions();
Assertions.assertThat(isMultipartCopyEnabled())
.describedAs("Multipart copy should be disabled in %s", getFileSystem())
.isFalse();
}
}