@@ -463,7 +463,7 @@ public class S3AFileSystem extends FileSystem implements StreamCapabilities,
463
463
* Flag to indicate that the higher performance copyFromLocalFile implementation
464
464
* should be used.
465
465
*/
466
- private boolean copyFromLocalPerfomance ;
466
+ private boolean optimizedCopyFromLocal ;
467
467
468
468
/** Add any deprecated keys. */
469
469
@ SuppressWarnings ("deprecation" )
@@ -691,8 +691,9 @@ public void initialize(URI name, Configuration originalConf)
691
691
AWS_S3_VECTOR_ACTIVE_RANGE_READS , DEFAULT_AWS_S3_VECTOR_ACTIVE_RANGE_READS , 1 );
692
692
vectoredIOContext = populateVectoredIOContext (conf );
693
693
scheme = (this .uri != null && this .uri .getScheme () != null ) ? this .uri .getScheme () : FS_S3A ;
694
- copyFromLocalPerfomance = conf .getBoolean (COPY_FROM_LOCAL_ENABLED ,
695
- COPY_FROM_LOCAL_ENABLED_DEFAULT );
694
+ optimizedCopyFromLocal = conf .getBoolean (OPTIMIZED_COPY_FROM_LOCAL ,
695
+ OPTIMIZED_COPY_FROM_LOCAL_DEFAULT );
696
+ LOG .debug ("Using optimized copyFromLocal implementation: {}" , optimizedCopyFromLocal );
696
697
} catch (SdkException e ) {
697
698
// amazon client exception: stop all services then throw the translation
698
699
cleanupWithLogger (LOG , span );
@@ -4026,7 +4027,7 @@ private boolean s3Exists(final Path path, final Set<StatusProbeEnum> probes)
4026
4027
* the given dst name.
4027
4028
*
4028
4029
* This version doesn't need to create a temporary file to calculate the md5.
4029
- * If {@link Constants#COPY_FROM_LOCAL_ENABLED } is set to false,
4030
+ * If {@link Constants#OPTIMIZED_COPY_FROM_LOCAL } is set to false,
4030
4031
* the superclass implementation is used.
4031
4032
*
4032
4033
* @param delSrc whether to delete the src
@@ -4044,17 +4045,20 @@ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
4044
4045
checkNotClosed ();
4045
4046
LOG .debug ("Copying local file from {} to {} (delSrc={} overwrite={}" ,
4046
4047
src , dst , delSrc , overwrite );
4047
- if (copyFromLocalPerfomance ) {
4048
- trackDurationAndSpan (INVOCATION_COPY_FROM_LOCAL_FILE , dst ,
4049
- () -> new CopyFromLocalOperation (
4048
+ if (optimizedCopyFromLocal ) {
4049
+ trackDurationAndSpan (INVOCATION_COPY_FROM_LOCAL_FILE , dst , () ->
4050
+ new CopyFromLocalOperation (
4050
4051
createStoreContext (),
4051
4052
src ,
4052
4053
dst ,
4053
4054
delSrc ,
4054
4055
overwrite ,
4055
- createCopyFromLocalCallbacks ()).execute ());
4056
+ createCopyFromLocalCallbacks (getActiveAuditSpan ()))
4057
+ .execute ());
4056
4058
} else {
4057
- // call the superclass, but still count statistics
4059
+ // call the superclass, but still count statistics.
4060
+ // there is no overall span here, as each FS API call will
4061
+ // be in its own span.
4058
4062
LOG .debug ("Using base copyFromLocalFile implementation" );
4059
4063
trackDurationAndSpan (INVOCATION_COPY_FROM_LOCAL_FILE , dst , () -> {
4060
4064
super .copyFromLocalFile (delSrc , overwrite , src , dst );
@@ -4063,17 +4067,29 @@ public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src,
4063
4067
}
4064
4068
}
4065
4069
4070
+ /**
4071
+ * Create the CopyFromLocalCallbacks;
4072
+ * protected to assist in mocking
4073
+ * @param span audit span.
4074
+ * @return the callbacks
4075
+ * @throws IOException failure to get the local fs.
4076
+ */
4066
4077
protected CopyFromLocalOperation .CopyFromLocalOperationCallbacks
4067
- createCopyFromLocalCallbacks () throws IOException {
4078
+ createCopyFromLocalCallbacks (final AuditSpanS3A span ) throws IOException {
4068
4079
LocalFileSystem local = getLocal (getConf ());
4069
- return new CopyFromLocalCallbacksImpl (local );
4080
+ return new CopyFromLocalCallbacksImpl (span , local );
4070
4081
}
4071
4082
4072
4083
protected final class CopyFromLocalCallbacksImpl implements
4073
4084
CopyFromLocalOperation .CopyFromLocalOperationCallbacks {
4085
+
4086
+ /** Span to use for all operations. */
4087
+ private final AuditSpanS3A span ;
4074
4088
private final LocalFileSystem local ;
4075
4089
4076
- private CopyFromLocalCallbacksImpl (LocalFileSystem local ) {
4090
+ private CopyFromLocalCallbacksImpl (final AuditSpanS3A span ,
4091
+ LocalFileSystem local ) {
4092
+ this .span = span ;
4077
4093
this .local = local ;
4078
4094
}
4079
4095
@@ -4095,21 +4111,18 @@ public boolean deleteLocal(Path path, boolean recursive) throws IOException {
4095
4111
4096
4112
@ Override
4097
4113
public void copyLocalFileFromTo (File file , Path from , Path to ) throws IOException {
4098
- trackDurationAndSpan (
4099
- OBJECT_PUT_REQUESTS ,
4100
- to ,
4101
- () -> {
4102
- final String key = pathToKey (to );
4103
- Progressable progress = null ;
4104
- PutObjectRequest .Builder putObjectRequestBuilder =
4105
- newPutObjectRequestBuilder (key , file .length (), false );
4106
- final String d = to .toString ();
4107
- S3AFileSystem .this .invoker .retry ("putObject(" + d + ")" , d , true ,
4108
- () -> executePut (putObjectRequestBuilder .build (), progress , putOptionsForPath (to ),
4109
- file ));
4110
-
4114
+ // the duration of the put is measured, but the active span is the
4115
+ // constructor-supplied one -this ensures all audit log events are grouped correctly
4116
+ span .activate ();
4117
+ trackDuration (getDurationTrackerFactory (), OBJECT_PUT_REQUESTS .getSymbol (), () -> {
4118
+ final String key = pathToKey (to );
4119
+ PutObjectRequest .Builder putObjectRequestBuilder =
4120
+ newPutObjectRequestBuilder (key , file .length (), false );
4121
+ final String dest = to .toString ();
4122
+ S3AFileSystem .this .invoker .retry ("putObject(" + dest + ")" , dest , true , () ->
4123
+ executePut (putObjectRequestBuilder .build (), null , putOptionsForPath (to ), file ));
4111
4124
return null ;
4112
- });
4125
+ });
4113
4126
}
4114
4127
4115
4128
@ Override
@@ -5370,6 +5383,10 @@ public boolean hasPathCapability(final Path path, final String capability)
5370
5383
case FS_S3A_CREATE_HEADER :
5371
5384
return true ;
5372
5385
5386
+ // is the optimized copy from local enabled.
5387
+ case OPTIMIZED_COPY_FROM_LOCAL :
5388
+ return optimizedCopyFromLocal ;
5389
+
5373
5390
default :
5374
5391
return super .hasPathCapability (p , cap );
5375
5392
}
0 commit comments