Skip to content

Commit 0244dbd

Browse files
authored
HADOOP-19394. S3A: Integrate with AAL's readVectored(). (#7720)
Contributed by: Ahmar Suhail
1 parent 2d8559b commit 0244dbd

File tree

2 files changed

+96
-1
lines changed

2 files changed

+96
-1
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/impl/streams/AnalyticsStream.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,9 +21,16 @@
2121

2222
import java.io.EOFException;
2323
import java.io.IOException;
24+
import java.nio.ByteBuffer;
25+
import java.util.ArrayList;
26+
import java.util.List;
27+
import java.util.concurrent.CompletableFuture;
28+
import java.util.function.Consumer;
29+
import java.util.function.IntFunction;
2430

2531
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStreamFactory;
2632
import software.amazon.s3.analyticsaccelerator.S3SeekableInputStream;
33+
import software.amazon.s3.analyticsaccelerator.common.ObjectRange;
2734
import software.amazon.s3.analyticsaccelerator.request.ObjectMetadata;
2835
import software.amazon.s3.analyticsaccelerator.util.InputPolicy;
2936
import software.amazon.s3.analyticsaccelerator.util.OpenStreamInformation;
@@ -37,6 +44,11 @@
3744
import org.apache.hadoop.fs.s3a.Retries;
3845
import org.apache.hadoop.fs.s3a.S3AInputPolicy;
3946
import org.apache.hadoop.fs.s3a.S3ObjectAttributes;
47+
import org.apache.hadoop.fs.FileRange;
48+
import org.apache.hadoop.fs.VectoredReadUtils;
49+
50+
import static org.apache.hadoop.fs.VectoredReadUtils.LOG_BYTE_BUFFER_RELEASED;
51+
4052

4153
/**
4254
* Analytics stream creates a stream using aws-analytics-accelerator-s3. This stream supports
@@ -128,6 +140,42 @@ public int read(byte[] buf, int off, int len) throws IOException {
128140
return bytesRead;
129141
}
130142

143+
/**
144+
* Pass to {@link #readVectored(List, IntFunction, Consumer)}
145+
* with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
146+
* {@inheritDoc}
147+
*/
148+
@Override
149+
public void readVectored(List<? extends FileRange> ranges,
150+
IntFunction<ByteBuffer> allocate) throws IOException {
151+
readVectored(ranges, allocate, LOG_BYTE_BUFFER_RELEASED);
152+
}
153+
154+
/**
155+
* Pass to {@link #readVectored(List, IntFunction, Consumer)}
156+
* with the {@link VectoredReadUtils#LOG_BYTE_BUFFER_RELEASED} releaser.
157+
* {@inheritDoc}
158+
*/
159+
@Override
160+
public void readVectored(final List<? extends FileRange> ranges,
161+
final IntFunction<ByteBuffer> allocate,
162+
final Consumer<ByteBuffer> release) throws IOException {
163+
LOG.debug("AAL: Starting vectored read on path {} for ranges {} ", getPathStr(), ranges);
164+
throwIfClosed();
165+
166+
List<ObjectRange> objectRanges = new ArrayList<>();
167+
168+
for (FileRange range : ranges) {
169+
CompletableFuture<ByteBuffer> result = new CompletableFuture<>();
170+
ObjectRange objectRange = new ObjectRange(result, range.getOffset(), range.getLength());
171+
objectRanges.add(objectRange);
172+
range.setData(result);
173+
}
174+
175+
// AAL does not do any range coalescing, so input and combined ranges are the same.
176+
this.getS3AStreamStatistics().readVectoredOperationStarted(ranges.size(), ranges.size());
177+
inputStream.readVectored(objectRanges, allocate, release);
178+
}
131179

132180
@Override
133181
public boolean seekToNewSource(long l) throws IOException {

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/contract/s3a/ITestS3AContractAnalyticsStreamVectoredRead.java

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,16 +18,27 @@
1818

1919
package org.apache.hadoop.fs.contract.s3a;
2020

21+
import java.util.List;
22+
2123
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.fs.FSDataInputStream;
25+
import org.apache.hadoop.fs.FileRange;
2226
import org.apache.hadoop.fs.contract.AbstractContractVectoredReadTest;
2327
import org.apache.hadoop.fs.contract.AbstractFSContract;
28+
import org.apache.hadoop.fs.contract.ContractTestUtils;
29+
import org.apache.hadoop.fs.statistics.IOStatistics;
30+
import org.apache.hadoop.fs.statistics.StreamStatisticNames;
2431
import org.apache.hadoop.test.tags.IntegrationTest;
2532

33+
import org.junit.jupiter.api.Test;
2634
import org.junit.jupiter.params.ParameterizedClass;
2735
import org.junit.jupiter.params.provider.MethodSource;
2836

37+
import static org.apache.hadoop.fs.contract.ContractTestUtils.skip;
38+
import static org.apache.hadoop.fs.contract.ContractTestUtils.validateVectoredReadResult;
2939
import static org.apache.hadoop.fs.s3a.S3ATestUtils.enableAnalyticsAccelerator;
3040
import static org.apache.hadoop.fs.s3a.S3ATestUtils.skipForAnyEncryptionExceptSSES3;
41+
import static org.apache.hadoop.fs.statistics.IOStatisticAssertions.verifyStatisticCounterValue;
3142

3243
/**
3344
* S3A contract tests for vectored reads with the Analytics stream.
@@ -64,12 +75,48 @@ protected Configuration createConfiguration() {
6475
// This issue is tracked in:
6576
// https://github.com/awslabs/analytics-accelerator-s3/issues/218
6677
skipForAnyEncryptionExceptSSES3(conf);
67-
conf.set("fs.contract.vector-io-early-eof-check", "false");
6878
return conf;
6979
}
7080

7181
@Override
7282
protected AbstractFSContract createContract(Configuration conf) {
7383
return new S3AContract(conf);
7484
}
85+
86+
/**
87+
* When the offset is negative, AAL returns IllegalArgumentException, whereas the base implementation will return
88+
* an EoF.
89+
*/
90+
@Override
91+
public void testNegativeOffsetRange() throws Exception {
92+
verifyExceptionalVectoredRead(ContractTestUtils.range(-1, 50), IllegalArgumentException.class);
93+
}
94+
95+
/**
96+
* Currently there is no null check on the release operation, this will be fixed in the next AAL version.
97+
*/
98+
@Override
99+
public void testNullReleaseOperation() {
100+
skip("AAL current does not do a null check on the release operation");
101+
}
102+
103+
@Test
104+
public void testReadVectoredWithAALStatsCollection() throws Exception {
105+
106+
List<FileRange> fileRanges = createSampleNonOverlappingRanges();
107+
try (FSDataInputStream in = openVectorFile()) {
108+
in.readVectored(fileRanges, getAllocate());
109+
110+
validateVectoredReadResult(fileRanges, DATASET, 0);
111+
IOStatistics st = in.getIOStatistics();
112+
113+
// Statistics such as GET requests will be added after IoStats support.
114+
verifyStatisticCounterValue(st,
115+
StreamStatisticNames.STREAM_READ_ANALYTICS_OPENED, 1);
116+
117+
verifyStatisticCounterValue(st,
118+
StreamStatisticNames.STREAM_READ_VECTORED_OPERATIONS,
119+
1);
120+
}
121+
}
75122
}

0 commit comments

Comments
 (0)