Skip to content

Commit d227ce5

Browse files
committed
HADOOP-18948. S3A. Add option fs.s3a.directory.operations.purge.uploads to purge on rename/delete
Delete and rename get to optionally pending uploads in a separate thread. * Adds test ITestUploadPurgeOnDirectoryOperations to validate outcome and cost * Fix up instrumentation to distinguish api calls from store operations * Fix up audit spans to ensure abort calls are in spans. * Doc in third_party docs with all existing options added. * list multiparts ensured the path to delete must always end in a / That's needed to avoid deleting uploads in adjacent directories. * fix tests failing by that / append & which were asserting on the file path rather than the parent dir. Change-Id: I3e75c8da9e93be05ab7cb3703e5336026dec1ace
1 parent 3e0fcda commit d227ce5

File tree

17 files changed

+499
-83
lines changed

17 files changed

+499
-83
lines changed

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/statistics/StoreStatisticNames.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -244,6 +244,13 @@ public final class StoreStatisticNames {
244244
public static final String OBJECT_MULTIPART_UPLOAD_ABORTED =
245245
"object_multipart_aborted";
246246

247+
/**
248+
* Object multipart list request.
249+
* Value :{@value}.
250+
*/
251+
public static final String OBJECT_MULTIPART_UPLOAD_LIST =
252+
"object_multipart_list";
253+
247254
/**
248255
* Object put/multipart upload count.
249256
* Value :{@value}.

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Constants.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1318,4 +1318,19 @@ private Constants() {
13181318
* The bucket region header.
13191319
*/
13201320
public static final String BUCKET_REGION_HEADER = "x-amz-bucket-region";
1321+
1322+
/**
1323+
* Should directory operations purge uploads?
1324+
* This adds at least one parallelized list operation to the call,
1325+
* plus the overhead of deletions.
1326+
* Value: {@value}.
1327+
*/
1328+
public static final String DIRECTORY_OPERATIONS_PURGE_UPLOADS =
1329+
"fs.s3a.directory.operations.purge.uploads";
1330+
1331+
/**
1332+
* Default value of {@link #DIRECTORY_OPERATIONS_PURGE_UPLOADS}: {@value}.
1333+
*/
1334+
public static final boolean DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT = false;
1335+
13211336
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/MultipartUtils.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,7 @@
3636
import org.apache.hadoop.fs.s3a.impl.StoreContext;
3737
import org.apache.hadoop.fs.store.audit.AuditSpan;
3838

39-
import static org.apache.hadoop.fs.s3a.Statistic.MULTIPART_UPLOAD_LIST;
39+
import static org.apache.hadoop.fs.s3a.Statistic.OBJECT_MULTIPART_UPLOAD_LIST;
4040
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfOperation;
4141

4242

@@ -66,7 +66,7 @@ private MultipartUtils() { }
6666
* @param maxKeys maximum batch size to request at a time from S3.
6767
* @return an iterator of matching uploads
6868
*/
69-
static MultipartUtils.UploadIterator listMultipartUploads(
69+
static RemoteIterator<MultipartUpload> listMultipartUploads(
7070
final StoreContext storeContext,
7171
S3Client s3,
7272
@Nullable String prefix,
@@ -196,7 +196,7 @@ private void requestNextBatch() throws IOException {
196196

197197
listing = invoker.retry("listMultipartUploads", prefix, true,
198198
trackDurationOfOperation(storeContext.getInstrumentation(),
199-
MULTIPART_UPLOAD_LIST.getSymbol(),
199+
OBJECT_MULTIPART_UPLOAD_LIST.getSymbol(),
200200
() -> s3.listMultipartUploads(requestBuilder.build())));
201201
LOG.debug("Listing found {} upload(s)",
202202
listing.uploads().size());

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AFileSystem.java

Lines changed: 114 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -258,6 +258,7 @@
258258
import static org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDurationOfSupplier;
259259
import static org.apache.hadoop.io.IOUtils.cleanupWithLogger;
260260
import static org.apache.hadoop.util.Preconditions.checkArgument;
261+
import static org.apache.hadoop.util.functional.RemoteIterators.foreach;
261262
import static org.apache.hadoop.util.functional.RemoteIterators.typeCastingRemoteIterator;
262263

263264
/**
@@ -384,6 +385,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
384385
private SignerManager signerManager;
385386
private S3AInternals s3aInternals;
386387

388+
/**
389+
* Do directory operations purge pending uploads?
390+
*/
391+
private boolean dirOperationsPurgeUploads;
392+
387393
/**
388394
* Page size for deletions.
389395
*/
@@ -565,6 +571,9 @@ public void initialize(URI name, Configuration originalConf)
565571
//check but do not store the block size
566572
longBytesOption(conf, FS_S3A_BLOCK_SIZE, DEFAULT_BLOCKSIZE, 1);
567573
enableMultiObjectsDelete = conf.getBoolean(ENABLE_MULTI_DELETE, true);
574+
// should the delete also purge uploads.
575+
dirOperationsPurgeUploads = conf.getBoolean(DIRECTORY_OPERATIONS_PURGE_UPLOADS,
576+
DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT);
568577

569578
this.prefetchEnabled = conf.getBoolean(PREFETCH_ENABLED_KEY, PREFETCH_ENABLED_DEFAULT);
570579
long prefetchBlockSizeLong =
@@ -1230,7 +1239,7 @@ public void abortOutstandingMultipartUploads(long seconds)
12301239
purgeBefore);
12311240
invoker.retry("Purging multipart uploads", bucket, true,
12321241
() -> {
1233-
MultipartUtils.UploadIterator uploadIterator =
1242+
RemoteIterator<MultipartUpload> uploadIterator =
12341243
MultipartUtils.listMultipartUploads(createStoreContext(), s3Client, null, maxKeys);
12351244

12361245
while (uploadIterator.hasNext()) {
@@ -2283,12 +2292,14 @@ private long innerRename(Path source, Path dest)
22832292

22842293
// Initiate the rename.
22852294
// this will call back into this class via the rename callbacks
2295+
final StoreContext storeContext = createStoreContext();
22862296
RenameOperation renameOperation = new RenameOperation(
2287-
createStoreContext(),
2297+
storeContext,
22882298
src, srcKey, p.getLeft(),
22892299
dst, dstKey, p.getRight(),
2290-
new OperationCallbacksImpl(),
2291-
pageSize);
2300+
new OperationCallbacksImpl(storeContext),
2301+
pageSize,
2302+
dirOperationsPurgeUploads);
22922303
return renameOperation.execute();
22932304
}
22942305

@@ -2309,8 +2320,19 @@ private final class OperationCallbacksImpl implements OperationCallbacks {
23092320
/** Audit Span at time of creation. */
23102321
private final AuditSpan auditSpan;
23112322

2312-
private OperationCallbacksImpl() {
2313-
auditSpan = getActiveAuditSpan();
2323+
private final StoreContext storeContext;
2324+
2325+
private OperationCallbacksImpl(final StoreContext storeContext) {
2326+
this.storeContext = requireNonNull(storeContext);
2327+
this.auditSpan = storeContext.getActiveAuditSpan();
2328+
}
2329+
2330+
/**
2331+
* Get the audit span.
2332+
* @return the span
2333+
*/
2334+
private AuditSpan getAuditSpan() {
2335+
return auditSpan;
23142336
}
23152337

23162338
@Override
@@ -2410,7 +2432,29 @@ public RemoteIterator<S3AFileStatus> listObjects(
24102432
Listing.ACCEPT_ALL_BUT_S3N,
24112433
auditSpan));
24122434
}
2413-
}
2435+
2436+
/**
2437+
* Abort multipart uploads under a path.
2438+
* @param prefix prefix for uploads to abort
2439+
* @return a count of aborts
2440+
* @throws IOException trouble; FileNotFoundExceptions are swallowed.
2441+
*/
2442+
@Override
2443+
@Retries.RetryTranslated
2444+
public long abortMultipartUploadsUnderPrefix(String prefix)
2445+
throws IOException {
2446+
getAuditSpan().activate();
2447+
// this deactivates the audit span somehow
2448+
final RemoteIterator<MultipartUpload> uploads =
2449+
S3AFileSystem.this.listUploadsUnderPrefix(storeContext, prefix);
2450+
// so reactivate it.
2451+
getAuditSpan().activate();
2452+
return foreach(uploads, upload ->
2453+
invoker.retry("Aborting multipart commit", upload.key(), true, () ->
2454+
S3AFileSystem.this.abortMultipartUpload(upload)));
2455+
}
2456+
2457+
} // end OperationCallbacksImpl
24142458

24152459
/**
24162460
* Callbacks from {@link Listing}.
@@ -3371,14 +3415,17 @@ protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOEx
33713415
// span covers delete, getFileStatus, fake directory operations.
33723416
try (AuditSpan span = createSpan(INVOCATION_DELETE.getSymbol(),
33733417
path.toString(), null)) {
3418+
// SC will include active span
3419+
final StoreContext storeContext = createStoreContext();
33743420
boolean outcome = trackDuration(getDurationTrackerFactory(),
33753421
INVOCATION_DELETE.getSymbol(),
33763422
new DeleteOperation(
3377-
createStoreContext(),
3423+
storeContext,
33783424
innerGetFileStatus(path, true, StatusProbeEnum.ALL),
33793425
recursive,
3380-
new OperationCallbacksImpl(),
3381-
pageSize));
3426+
new OperationCallbacksImpl(storeContext),
3427+
pageSize,
3428+
dirOperationsPurgeUploads));
33823429
if (outcome) {
33833430
try {
33843431
maybeCreateFakeParentDirectory(path);
@@ -5151,13 +5198,39 @@ S3ALocatedFileStatus toLocatedFileStatus(S3AFileStatus status)
51515198
@InterfaceAudience.Private
51525199
@Retries.RetryTranslated
51535200
@AuditEntryPoint
5154-
public MultipartUtils.UploadIterator listUploads(@Nullable String prefix)
5201+
public RemoteIterator<MultipartUpload> listUploads(@Nullable String prefix)
5202+
throws IOException {
5203+
// span is picked up retained in the listing.
5204+
checkNotClosed();
5205+
try (AuditSpan span = createSpan(MULTIPART_UPLOAD_LIST.getSymbol(),
5206+
prefix, null)) {
5207+
return listUploadsUnderPrefix(createStoreContext(), prefix);
5208+
}
5209+
}
5210+
5211+
/**
5212+
* List any pending multipart uploads whose keys begin with prefix, using
5213+
* an iterator that can handle an unlimited number of entries.
5214+
* See {@link #listMultipartUploads(String)} for a non-iterator version of
5215+
* this.
5216+
* @param storeContext store conext.
5217+
* @param prefix optional key prefix to search
5218+
* @return Iterator over multipart uploads.
5219+
* @throws IOException on failure
5220+
*/
5221+
@InterfaceAudience.Private
5222+
@Retries.RetryTranslated
5223+
public RemoteIterator<MultipartUpload> listUploadsUnderPrefix(
5224+
final StoreContext storeContext,
5225+
final @Nullable String prefix)
51555226
throws IOException {
51565227
// span is picked up retained in the listing.
5157-
return trackDurationAndSpan(MULTIPART_UPLOAD_LIST, prefix, null, () ->
5158-
MultipartUtils.listMultipartUploads(
5159-
createStoreContext(), s3Client, prefix, maxKeys
5160-
));
5228+
String p = prefix;
5229+
if (prefix != null && !prefix.isEmpty() && !prefix.endsWith("/")) {
5230+
p = prefix + "/";
5231+
}
5232+
// duration tracking is done in iterator.
5233+
return MultipartUtils.listMultipartUploads(storeContext, s3Client, p, maxKeys);
51615234
}
51625235

51635236
/**
@@ -5179,9 +5252,10 @@ public List<MultipartUpload> listMultipartUploads(String prefix)
51795252
}
51805253
String p = prefix;
51815254
return invoker.retry("listMultipartUploads", p, true, () -> {
5182-
ListMultipartUploadsRequest.Builder requestBuilder = getRequestFactory()
5183-
.newListMultipartUploadsRequestBuilder(p);
5184-
return s3Client.listMultipartUploads(requestBuilder.build()).uploads();
5255+
final ListMultipartUploadsRequest request = getRequestFactory()
5256+
.newListMultipartUploadsRequestBuilder(p).build();
5257+
return trackDuration(getInstrumentation(), MULTIPART_UPLOAD_LIST.getSymbol(), () ->
5258+
s3Client.listMultipartUploads(request).uploads());
51855259
});
51865260
}
51875261

@@ -5190,37 +5264,35 @@ public List<MultipartUpload> listMultipartUploads(String prefix)
51905264
* Retry policy: none.
51915265
* @param destKey destination key
51925266
* @param uploadId Upload ID
5267+
* @throws IOException IO failure, including any uprated SdkException
51935268
*/
5194-
@Retries.OnceRaw
5195-
void abortMultipartUpload(String destKey, String uploadId) {
5196-
LOG.info("Aborting multipart upload {} to {}", uploadId, destKey);
5197-
s3Client.abortMultipartUpload(
5198-
getRequestFactory().newAbortMultipartUploadRequestBuilder(
5199-
destKey,
5200-
uploadId).build());
5269+
@Retries.OnceTranslated
5270+
public void abortMultipartUpload(String destKey, String uploadId) throws IOException {
5271+
LOG.debug("Aborting multipart upload {} to {}", uploadId, destKey);
5272+
trackDuration(getInstrumentation(), OBJECT_MULTIPART_UPLOAD_ABORTED.getSymbol(), () ->
5273+
s3Client.abortMultipartUpload(
5274+
getRequestFactory().newAbortMultipartUploadRequestBuilder(
5275+
destKey,
5276+
uploadId).build()));
52015277
}
52025278

52035279
/**
52045280
* Abort a multipart upload.
52055281
* Retry policy: none.
52065282
* @param upload the listed upload to abort.
5283+
* @throws IOException IO failure, including any uprated SdkException
52075284
*/
5208-
@Retries.OnceRaw
5209-
void abortMultipartUpload(MultipartUpload upload) {
5210-
String destKey;
5211-
String uploadId;
5212-
destKey = upload.key();
5213-
uploadId = upload.uploadId();
5214-
if (LOG.isInfoEnabled()) {
5285+
@Retries.OnceTranslated
5286+
public void abortMultipartUpload(MultipartUpload upload) throws IOException {
5287+
String destKey = upload.key();
5288+
String uploadId = upload.uploadId();
5289+
if (LOG.isDebugEnabled()) {
52155290
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
52165291
LOG.debug("Aborting multipart upload {} to {} initiated by {} on {}",
52175292
uploadId, destKey, upload.initiator(),
52185293
df.format(Date.from(upload.initiated())));
52195294
}
5220-
s3Client.abortMultipartUpload(
5221-
getRequestFactory().newAbortMultipartUploadRequestBuilder(
5222-
destKey,
5223-
uploadId).build());
5295+
abortMultipartUpload(destKey, uploadId);
52245296
}
52255297

52265298
/**
@@ -5266,13 +5338,17 @@ public boolean hasPathCapability(final Path path, final String capability)
52665338
case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE:
52675339
return true;
52685340

5341+
// Do directory operations purge uploads.
5342+
case DIRECTORY_OPERATIONS_PURGE_UPLOADS:
5343+
return dirOperationsPurgeUploads;
5344+
52695345
// etags are avaialable in listings, but they
52705346
// are not consistent across renames.
52715347
// therefore, only availability is declared
52725348
case CommonPathCapabilities.ETAGS_AVAILABLE:
52735349
return true;
52745350

5275-
/*
5351+
/*
52765352
* Marker policy capabilities are handed off.
52775353
*/
52785354
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP:
@@ -5545,7 +5621,7 @@ public MarkerToolOperations createMarkerToolOperations(final String target)
55455621
throws IOException {
55465622
createSpan("marker-tool-scan", target,
55475623
null);
5548-
return new MarkerToolOperationsImpl(new OperationCallbacksImpl());
5624+
return new MarkerToolOperationsImpl(new OperationCallbacksImpl(createStoreContext()));
55495625
}
55505626

55515627
/**

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/Statistic.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -242,7 +242,10 @@ public enum Statistic {
242242
StoreStatisticNames.OBJECT_MULTIPART_UPLOAD_ABORTED,
243243
"Object multipart upload aborted",
244244
TYPE_DURATION),
245-
OBJECT_PUT_REQUESTS(
245+
OBJECT_MULTIPART_UPLOAD_LIST(
246+
StoreStatisticNames.OBJECT_MULTIPART_UPLOAD_LIST,
247+
"Object multipart list request issued",
248+
TYPE_DURATION), OBJECT_PUT_REQUESTS(
246249
StoreStatisticNames.OBJECT_PUT_REQUEST,
247250
"Object put/multipart upload count",
248251
TYPE_DURATION),

0 commit comments

Comments
 (0)