258
258
import static org .apache .hadoop .fs .statistics .impl .IOStatisticsBinding .trackDurationOfSupplier ;
259
259
import static org .apache .hadoop .io .IOUtils .cleanupWithLogger ;
260
260
import static org .apache .hadoop .util .Preconditions .checkArgument ;
261
+ import static org .apache .hadoop .util .functional .RemoteIterators .foreach ;
261
262
import static org .apache .hadoop .util .functional .RemoteIterators .typeCastingRemoteIterator ;
262
263
263
264
/**
@@ -384,6 +385,11 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
384
385
private SignerManager signerManager ;
385
386
private S3AInternals s3aInternals ;
386
387
388
+ /**
389
+ * Do directory operations purge pending uploads?
390
+ */
391
+ private boolean dirOperationsPurgeUploads ;
392
+
387
393
/**
388
394
* Page size for deletions.
389
395
*/
@@ -565,6 +571,9 @@ public void initialize(URI name, Configuration originalConf)
565
571
//check but do not store the block size
566
572
longBytesOption (conf , FS_S3A_BLOCK_SIZE , DEFAULT_BLOCKSIZE , 1 );
567
573
enableMultiObjectsDelete = conf .getBoolean (ENABLE_MULTI_DELETE , true );
574
+ // should the delete also purge uploads.
575
+ dirOperationsPurgeUploads = conf .getBoolean (DIRECTORY_OPERATIONS_PURGE_UPLOADS ,
576
+ DIRECTORY_OPERATIONS_PURGE_UPLOADS_DEFAULT );
568
577
569
578
this .prefetchEnabled = conf .getBoolean (PREFETCH_ENABLED_KEY , PREFETCH_ENABLED_DEFAULT );
570
579
long prefetchBlockSizeLong =
@@ -1230,7 +1239,7 @@ public void abortOutstandingMultipartUploads(long seconds)
1230
1239
purgeBefore );
1231
1240
invoker .retry ("Purging multipart uploads" , bucket , true ,
1232
1241
() -> {
1233
- MultipartUtils . UploadIterator uploadIterator =
1242
+ RemoteIterator < MultipartUpload > uploadIterator =
1234
1243
MultipartUtils .listMultipartUploads (createStoreContext (), s3Client , null , maxKeys );
1235
1244
1236
1245
while (uploadIterator .hasNext ()) {
@@ -2283,12 +2292,14 @@ private long innerRename(Path source, Path dest)
2283
2292
2284
2293
// Initiate the rename.
2285
2294
// this will call back into this class via the rename callbacks
2295
+ final StoreContext storeContext = createStoreContext ();
2286
2296
RenameOperation renameOperation = new RenameOperation (
2287
- createStoreContext () ,
2297
+ storeContext ,
2288
2298
src , srcKey , p .getLeft (),
2289
2299
dst , dstKey , p .getRight (),
2290
- new OperationCallbacksImpl (),
2291
- pageSize );
2300
+ new OperationCallbacksImpl (storeContext ),
2301
+ pageSize ,
2302
+ dirOperationsPurgeUploads );
2292
2303
return renameOperation .execute ();
2293
2304
}
2294
2305
@@ -2309,8 +2320,19 @@ private final class OperationCallbacksImpl implements OperationCallbacks {
2309
2320
/** Audit Span at time of creation. */
2310
2321
private final AuditSpan auditSpan ;
2311
2322
2312
- private OperationCallbacksImpl () {
2313
- auditSpan = getActiveAuditSpan ();
2323
+ private final StoreContext storeContext ;
2324
+
2325
+ private OperationCallbacksImpl (final StoreContext storeContext ) {
2326
+ this .storeContext = requireNonNull (storeContext );
2327
+ this .auditSpan = storeContext .getActiveAuditSpan ();
2328
+ }
2329
+
2330
+ /**
2331
+ * Get the audit span.
2332
+ * @return the span
2333
+ */
2334
+ private AuditSpan getAuditSpan () {
2335
+ return auditSpan ;
2314
2336
}
2315
2337
2316
2338
@ Override
@@ -2410,7 +2432,29 @@ public RemoteIterator<S3AFileStatus> listObjects(
2410
2432
Listing .ACCEPT_ALL_BUT_S3N ,
2411
2433
auditSpan ));
2412
2434
}
2413
- }
2435
+
2436
+ /**
2437
+ * Abort multipart uploads under a path.
2438
+ * @param prefix prefix for uploads to abort
2439
+ * @return a count of aborts
2440
+ * @throws IOException trouble; FileNotFoundExceptions are swallowed.
2441
+ */
2442
+ @ Override
2443
+ @ Retries .RetryTranslated
2444
+ public long abortMultipartUploadsUnderPrefix (String prefix )
2445
+ throws IOException {
2446
+ getAuditSpan ().activate ();
2447
+ // this deactivates the audit span somehow
2448
+ final RemoteIterator <MultipartUpload > uploads =
2449
+ S3AFileSystem .this .listUploadsUnderPrefix (storeContext , prefix );
2450
+ // so reactivate it.
2451
+ getAuditSpan ().activate ();
2452
+ return foreach (uploads , upload ->
2453
+ invoker .retry ("Aborting multipart commit" , upload .key (), true , () ->
2454
+ S3AFileSystem .this .abortMultipartUpload (upload )));
2455
+ }
2456
+
2457
+ } // end OperationCallbacksImpl
2414
2458
2415
2459
/**
2416
2460
* Callbacks from {@link Listing}.
@@ -3371,14 +3415,17 @@ protected boolean deleteWithoutCloseCheck(Path f, boolean recursive) throws IOEx
3371
3415
// span covers delete, getFileStatus, fake directory operations.
3372
3416
try (AuditSpan span = createSpan (INVOCATION_DELETE .getSymbol (),
3373
3417
path .toString (), null )) {
3418
+ // SC will include active span
3419
+ final StoreContext storeContext = createStoreContext ();
3374
3420
boolean outcome = trackDuration (getDurationTrackerFactory (),
3375
3421
INVOCATION_DELETE .getSymbol (),
3376
3422
new DeleteOperation (
3377
- createStoreContext () ,
3423
+ storeContext ,
3378
3424
innerGetFileStatus (path , true , StatusProbeEnum .ALL ),
3379
3425
recursive ,
3380
- new OperationCallbacksImpl (),
3381
- pageSize ));
3426
+ new OperationCallbacksImpl (storeContext ),
3427
+ pageSize ,
3428
+ dirOperationsPurgeUploads ));
3382
3429
if (outcome ) {
3383
3430
try {
3384
3431
maybeCreateFakeParentDirectory (path );
@@ -5151,13 +5198,39 @@ S3ALocatedFileStatus toLocatedFileStatus(S3AFileStatus status)
5151
5198
@ InterfaceAudience .Private
5152
5199
@ Retries .RetryTranslated
5153
5200
@ AuditEntryPoint
5154
- public MultipartUtils .UploadIterator listUploads (@ Nullable String prefix )
5201
+ public RemoteIterator <MultipartUpload > listUploads (@ Nullable String prefix )
5202
+ throws IOException {
5203
+ // span is picked up retained in the listing.
5204
+ checkNotClosed ();
5205
+ try (AuditSpan span = createSpan (MULTIPART_UPLOAD_LIST .getSymbol (),
5206
+ prefix , null )) {
5207
+ return listUploadsUnderPrefix (createStoreContext (), prefix );
5208
+ }
5209
+ }
5210
+
5211
+ /**
5212
+ * List any pending multipart uploads whose keys begin with prefix, using
5213
+ * an iterator that can handle an unlimited number of entries.
5214
+ * See {@link #listMultipartUploads(String)} for a non-iterator version of
5215
+ * this.
5216
+ * @param storeContext store conext.
5217
+ * @param prefix optional key prefix to search
5218
+ * @return Iterator over multipart uploads.
5219
+ * @throws IOException on failure
5220
+ */
5221
+ @ InterfaceAudience .Private
5222
+ @ Retries .RetryTranslated
5223
+ public RemoteIterator <MultipartUpload > listUploadsUnderPrefix (
5224
+ final StoreContext storeContext ,
5225
+ final @ Nullable String prefix )
5155
5226
throws IOException {
5156
5227
// span is picked up retained in the listing.
5157
- return trackDurationAndSpan (MULTIPART_UPLOAD_LIST , prefix , null , () ->
5158
- MultipartUtils .listMultipartUploads (
5159
- createStoreContext (), s3Client , prefix , maxKeys
5160
- ));
5228
+ String p = prefix ;
5229
+ if (prefix != null && !prefix .isEmpty () && !prefix .endsWith ("/" )) {
5230
+ p = prefix + "/" ;
5231
+ }
5232
+ // duration tracking is done in iterator.
5233
+ return MultipartUtils .listMultipartUploads (storeContext , s3Client , p , maxKeys );
5161
5234
}
5162
5235
5163
5236
/**
@@ -5179,9 +5252,10 @@ public List<MultipartUpload> listMultipartUploads(String prefix)
5179
5252
}
5180
5253
String p = prefix ;
5181
5254
return invoker .retry ("listMultipartUploads" , p , true , () -> {
5182
- ListMultipartUploadsRequest .Builder requestBuilder = getRequestFactory ()
5183
- .newListMultipartUploadsRequestBuilder (p );
5184
- return s3Client .listMultipartUploads (requestBuilder .build ()).uploads ();
5255
+ final ListMultipartUploadsRequest request = getRequestFactory ()
5256
+ .newListMultipartUploadsRequestBuilder (p ).build ();
5257
+ return trackDuration (getInstrumentation (), MULTIPART_UPLOAD_LIST .getSymbol (), () ->
5258
+ s3Client .listMultipartUploads (request ).uploads ());
5185
5259
});
5186
5260
}
5187
5261
@@ -5190,37 +5264,35 @@ public List<MultipartUpload> listMultipartUploads(String prefix)
5190
5264
* Retry policy: none.
5191
5265
* @param destKey destination key
5192
5266
* @param uploadId Upload ID
5267
+ * @throws IOException IO failure, including any uprated SdkException
5193
5268
*/
5194
- @ Retries .OnceRaw
5195
- void abortMultipartUpload (String destKey , String uploadId ) {
5196
- LOG .info ("Aborting multipart upload {} to {}" , uploadId , destKey );
5197
- s3Client .abortMultipartUpload (
5198
- getRequestFactory ().newAbortMultipartUploadRequestBuilder (
5199
- destKey ,
5200
- uploadId ).build ());
5269
+ @ Retries .OnceTranslated
5270
+ public void abortMultipartUpload (String destKey , String uploadId ) throws IOException {
5271
+ LOG .debug ("Aborting multipart upload {} to {}" , uploadId , destKey );
5272
+ trackDuration (getInstrumentation (), OBJECT_MULTIPART_UPLOAD_ABORTED .getSymbol (), () ->
5273
+ s3Client .abortMultipartUpload (
5274
+ getRequestFactory ().newAbortMultipartUploadRequestBuilder (
5275
+ destKey ,
5276
+ uploadId ).build ()));
5201
5277
}
5202
5278
5203
5279
/**
5204
5280
* Abort a multipart upload.
5205
5281
* Retry policy: none.
5206
5282
* @param upload the listed upload to abort.
5283
+ * @throws IOException IO failure, including any uprated SdkException
5207
5284
*/
5208
- @ Retries .OnceRaw
5209
- void abortMultipartUpload (MultipartUpload upload ) {
5210
- String destKey ;
5211
- String uploadId ;
5212
- destKey = upload .key ();
5213
- uploadId = upload .uploadId ();
5214
- if (LOG .isInfoEnabled ()) {
5285
+ @ Retries .OnceTranslated
5286
+ public void abortMultipartUpload (MultipartUpload upload ) throws IOException {
5287
+ String destKey = upload .key ();
5288
+ String uploadId = upload .uploadId ();
5289
+ if (LOG .isDebugEnabled ()) {
5215
5290
DateFormat df = new SimpleDateFormat ("yyyy-MM-dd HH:mm:ss" );
5216
5291
LOG .debug ("Aborting multipart upload {} to {} initiated by {} on {}" ,
5217
5292
uploadId , destKey , upload .initiator (),
5218
5293
df .format (Date .from (upload .initiated ())));
5219
5294
}
5220
- s3Client .abortMultipartUpload (
5221
- getRequestFactory ().newAbortMultipartUploadRequestBuilder (
5222
- destKey ,
5223
- uploadId ).build ());
5295
+ abortMultipartUpload (destKey , uploadId );
5224
5296
}
5225
5297
5226
5298
/**
@@ -5266,13 +5338,17 @@ public boolean hasPathCapability(final Path path, final String capability)
5266
5338
case STORE_CAPABILITY_DIRECTORY_MARKER_AWARE :
5267
5339
return true ;
5268
5340
5341
+ // Do directory operations purge uploads.
5342
+ case DIRECTORY_OPERATIONS_PURGE_UPLOADS :
5343
+ return dirOperationsPurgeUploads ;
5344
+
5269
5345
// etags are avaialable in listings, but they
5270
5346
// are not consistent across renames.
5271
5347
// therefore, only availability is declared
5272
5348
case CommonPathCapabilities .ETAGS_AVAILABLE :
5273
5349
return true ;
5274
5350
5275
- /*
5351
+ /*
5276
5352
* Marker policy capabilities are handed off.
5277
5353
*/
5278
5354
case STORE_CAPABILITY_DIRECTORY_MARKER_POLICY_KEEP :
@@ -5545,7 +5621,7 @@ public MarkerToolOperations createMarkerToolOperations(final String target)
5545
5621
throws IOException {
5546
5622
createSpan ("marker-tool-scan" , target ,
5547
5623
null );
5548
- return new MarkerToolOperationsImpl (new OperationCallbacksImpl ());
5624
+ return new MarkerToolOperationsImpl (new OperationCallbacksImpl (createStoreContext () ));
5549
5625
}
5550
5626
5551
5627
/**
0 commit comments