Skip to content

Commit 038a692

Browse files
committed
lazy eval of stream factory + test fixes
1 parent f408ec5 commit 038a692

File tree

3 files changed

+22
-16
lines changed

3 files changed

+22
-16
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStreamFactory.java

Lines changed: 19 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,11 @@
2121

2222
import org.apache.hadoop.conf.Configuration;
2323
import org.apache.hadoop.fs.s3a.VectoredIOContext;
24+
import org.apache.hadoop.util.functional.CallableRaisingIOE;
25+
import org.apache.hadoop.util.functional.LazyAutoCloseableReference;
2426

27+
import software.amazon.awssdk.services.s3.S3AsyncClient;
28+
import software.amazon.awssdk.services.s3.S3Client;
2529
import software.amazon.s3.analyticsaccelerator.S3SdkObjectClient;
2630
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamConfiguration;
2731
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
@@ -39,7 +43,7 @@
3943
public class AnalyticsStreamFactory extends AbstractObjectInputStreamFactory {
4044

4145
private S3SeekableInputStreamConfiguration seekableInputStreamConfiguration;
42-
private S3SeekableInputStreamFactory s3SeekableInputStreamFactory;
46+
private LazyAutoCloseableReference<S3SeekableInputStreamFactory> s3SeekableInputStreamFactory;
4347
private boolean requireCrt;
4448

4549
public AnalyticsStreamFactory() {
@@ -59,20 +63,17 @@ protected void serviceInit(final Configuration conf) throws Exception {
5963
@Override
6064
public void bind(final FactoryBindingParameters factoryBindingParameters) throws IOException {
6165
super.bind(factoryBindingParameters);
62-
this.s3SeekableInputStreamFactory = new S3SeekableInputStreamFactory(
63-
new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
64-
seekableInputStreamConfiguration);
66+
this.s3SeekableInputStreamFactory = new LazyAutoCloseableReference<>(createS3SeekableInputStreamFactory());
67+
6568
}
6669

6770
@Override
6871
public ObjectInputStream readObject(final ObjectReadParameters parameters) throws IOException {
6972
return new AnalyticsStream(
7073
parameters,
71-
s3SeekableInputStreamFactory);
74+
getOrCreateS3SeekableInputStreamFactory());
7275
}
73-
74-
75-
76+
7677
@Override
7778
public InputStreamType streamType() {
7879
return InputStreamType.Analytics;
@@ -95,5 +96,15 @@ public StreamFactoryRequirements factoryRequirements() {
9596
0, vectorContext);
9697
}
9798

99+
private S3SeekableInputStreamFactory getOrCreateS3SeekableInputStreamFactory()
100+
throws IOException {
101+
return s3SeekableInputStreamFactory.eval();
102+
}
103+
104+
private CallableRaisingIOE<S3SeekableInputStreamFactory> createS3SeekableInputStreamFactory() {
105+
return () -> new S3SeekableInputStreamFactory(
106+
new S3SdkObjectClient(callbacks().getOrCreateAsyncClient(requireCrt)),
107+
seekableInputStreamConfiguration);
108+
}
98109

99110
}

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3ARequesterPays.java

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import static org.apache.hadoop.fs.s3a.Constants.ALLOW_REQUESTER_PAYS;
3434
import static org.apache.hadoop.fs.s3a.Constants.S3A_BUCKET_PROBE;
3535
import static org.apache.hadoop.fs.s3a.S3ATestUtils.streamType;
36-
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipIfAnalyticsAcceleratorEnabled;
3736
import static org.apache.hadoop.test.LambdaTestUtils.intercept;
3837

3938
/**
@@ -60,11 +59,7 @@ protected Configuration createConfiguration() {
6059
public void testRequesterPaysOptionSuccess() throws Throwable {
6160
describe("Test requester pays enabled case by reading last then first byte");
6261
skipIfClientSideEncryption();
63-
// Analytics accelerator currently does not support IOStatistics which leads to the
64-
// STREAM_READ_OPENED assertion to fail, this will be added as part of
65-
// https://issues.apache.org/jira/browse/HADOOP-19364
66-
skipIfAnalyticsAcceleratorEnabled(getConfiguration(),
67-
"Analytics Accelerator currently does not support IOStatistics");
62+
6863
Configuration conf = this.createConfiguration();
6964
conf.setBoolean(ALLOW_REQUESTER_PAYS, true);
7065
// Enable bucket exists check, the first failure point people may encounter

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/S3ATestUtils.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -590,8 +590,8 @@ public static void skipIfAnalyticsAcceleratorEnabled(
590590
}
591591

592592
public static boolean isAnalyticsAcceleratorEnabled(final Configuration conf) {
593-
return conf.getEnum(INPUT_STREAM_TYPE,
594-
InputStreamType.Classic) == InputStreamType.Analytics;
593+
return conf.get(INPUT_STREAM_TYPE,
594+
INPUT_STREAM_TYPE_CLASSIC).equals(INPUT_STREAM_TYPE_ANALYTICS);
595595
}
596596

597597
/**

0 commit comments

Comments
 (0)