Skip to content

Commit b92a661

Browse files
committed
pass down file open options such as read policy, file status to AAL
1 parent 3828ad5 commit b92a661

File tree

2 files changed

+51
-14
lines changed

2 files changed

+51
-14
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

Lines changed: 37 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,12 +25,16 @@
2525
import org.apache.hadoop.fs.FSExceptionMessages;
2626
import org.apache.hadoop.fs.StreamCapabilities;
2727
import org.apache.hadoop.fs.s3a.Retries;
28+
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
2829
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
2930
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
3031
import org.slf4j.Logger;
3132
import org.slf4j.LoggerFactory;
3233

3334
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
35+
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
36+
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
37+
import software.amazon.s3.analyticsaccelerator.util.OpenFileInformation;
3438
import software.amazon.s3.analyticsaccelerator.util.S3URI;
3539

3640
/**
@@ -49,7 +53,7 @@ public class AnalyticsStream extends ObjectInputStream implements StreamCapabili
4953
public AnalyticsStream(final ObjectReadParameters parameters, final S3SeekableInputStreamFactory s3SeekableInputStreamFactory) throws IOException {
5054
super(InputStreamType.Analytics, parameters);
5155
S3ObjectAttributes s3Attributes = parameters.getObjectAttributes();
52-
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()));
56+
this.inputStream = s3SeekableInputStreamFactory.createStream(S3URI.of(s3Attributes.getBucket(), s3Attributes.getKey()), buildOpenFileInformation(parameters));
5357
getS3AStreamStatistics().streamOpened(InputStreamType.Analytics);
5458
}
5559

@@ -197,6 +201,38 @@ private void onReadFailure(IOException ioe) throws IOException {
197201
this.close();
198202
}
199203

204+
private OpenFileInformation buildOpenFileInformation(ObjectReadParameters parameters) {
205+
OpenFileInformation openFileInformation = OpenFileInformation.builder().inputPolicy(mapS3AInputPolicyToAAL(parameters.getContext()
206+
.getInputPolicy()))
207+
.objectMetadata(ObjectMetadata.builder()
208+
.contentLength(parameters.getObjectAttributes().getLen())
209+
.etag(parameters.getObjectAttributes().getETag()).build())
210+
.build();
211+
212+
return openFileInformation;
213+
}
214+
215+
/**
216+
* If S3A's input policy is Sequential, that is, if the file format to be read is sequential
217+
* (CSV, JSON), or the file policy passed down is WHOLE_FILE, then AAL's parquet specific
218+
* optimisations will be turned off, regardless of the file extension. This is to allow for
219+
* applications like DISTCP that read parquet files, but will read them whole, and so do not
220+
* follow the typical parquet read patterns of reading footer first etc. and will not benefit
221+
* from parquet optimisations.
222+
* Else, AAL will make a decision on which optimisations based on the file extension,
223+
* if the file ends in .par or .parquet, then parquet specific optimisations are used.
224+
*
225+
* @param inputPolicy S3A's input file policy passed down when opening the file
226+
* @return the AAL read policy
227+
*/
228+
private InputPolicy mapS3AInputPolicyToAAL(S3AInputPolicy inputPolicy) {
229+
switch (inputPolicy) {
230+
case Sequential:
231+
return InputPolicy.Sequential;
232+
default:
233+
return InputPolicy.None;
234+
}
235+
}
200236

201237
protected void throwIfClosed() throws IOException {
202238
if (closed) {

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3AAnalyticsAcceleratorStream.java

Lines changed: 14 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -21,42 +21,34 @@
2121

2222
import java.io.File;
2323
import java.io.IOException;
24-
import java.nio.charset.Charset;
25-
import java.util.UUID;
24+
import java.io.InputStream;
2625

2726
import org.junit.Before;
2827
import org.junit.Test;
2928

30-
import org.apache.commons.io.FileUtils;
3129
import org.apache.hadoop.conf.Configuration;
3230
import org.apache.hadoop.fs.FSDataInputStream;
3331
import org.apache.hadoop.fs.FileStatus;
3432
import org.apache.hadoop.fs.FileSystem;
3533
import org.apache.hadoop.fs.Path;
36-
import org.apache.hadoop.fs.PathHandle;
37-
import org.apache.hadoop.fs.s3a.impl.AwsSdkWorkarounds;
34+
import org.apache.hadoop.fs.s3a.impl.streams.InputStreamType;
35+
import org.apache.hadoop.fs.s3a.impl.streams.ObjectInputStream;
3836
import org.apache.hadoop.fs.statistics.IOStatistics;
39-
import org.apache.hadoop.test.GenericTestUtils;
4037

41-
import static org.apache.commons.io.FileUtils.ONE_KB;
4238
import static org.apache.hadoop.fs.s3a.Constants.*;
4339
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
44-
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enablePrefetching;
4540
import static org.apache.hadoop.fs.s3a.S3ATestUtils.removeBaseAndBucketOverrides;
4641
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.getExternalData;
4742
import static org.apache.hadoop.fs.s3a.test.PublicDatasetTestUtils.isUsingDefaultExternalDataFile;
4843
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
4944
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED;
50-
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_OPENED;
51-
import static org.apache.hadoop.fs.statistics.StreamStatisticNames.STREAM_READ_PREFETCH_OPERATIONS;
52-
import static org.apache.hadoop.test.GenericTestUtils.LogCapturer.captureLogs;
45+
5346

5447
import org.assertj.core.api.Assertions;
5548

56-
import org.slf4j.LoggerFactory;
49+
5750
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
5851
import software.amazon.s3.analyticsaccelerator.common.ConnectorConfiguration;
59-
import software.amazon.s3.analyticsaccelerator.io.logical.parquet.ParquetMetadataParsingTask;
6052
import software.amazon.s3.analyticsaccelerator.util.PrefetchMode;
6153

6254
public class ITestS3AAnalyticsAcceleratorStream extends AbstractS3ATestBase {
@@ -90,6 +82,9 @@ public Configuration createConfiguration() {
9082
public void testConnectorFrameWorkIntegration() throws IOException {
9183
describe("Verify S3 connector framework integration");
9284

85+
removeBaseAndBucketOverrides(conf, INPUT_FADVISE);
86+
conf.set(INPUT_FADVISE, "whole-file");
87+
9388
S3AFileSystem fs =
9489
(S3AFileSystem) FileSystem.get(testFile.toUri(), conf);
9590
byte[] buffer = new byte[500];
@@ -99,7 +94,13 @@ public void testConnectorFrameWorkIntegration() throws IOException {
9994
ioStats = inputStream.getIOStatistics();
10095
inputStream.seek(5);
10196
inputStream.read(buffer, 0, 500);
97+
98+
final InputStream wrappedStream = inputStream.getWrappedStream();
99+
ObjectInputStream objectInputStream = (ObjectInputStream) wrappedStream;
100+
assertEquals(objectInputStream.streamType(), InputStreamType.Analytics);
101+
assertEquals(objectInputStream.getInputPolicy(), S3AInputPolicy.Sequential);
102102
}
103+
103104
verifyStatisticCounterValue(ioStats, STREAM_READ_ANALYTICS_OPENED, 1);
104105
}
105106

0 commit comments

Comments
 (0)