Skip to content

Commit e49f8a6

Browse files
committed
HADOOP-18189 S3APrefetchingInputStream to support status probes when closed
1 parent b0d5182 commit e49f8a6

File tree

3 files changed

+86
-9
lines changed

3 files changed

+86
-9
lines changed

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/S3AInputStream.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1164,6 +1164,7 @@ private void checkIfVectoredIOStopped() throws InterruptedIOException {
11641164
*/
11651165
@InterfaceAudience.Private
11661166
@InterfaceStability.Unstable
1167+
@VisibleForTesting
11671168
public S3AInputStreamStatistics getS3AStreamStatistics() {
11681169
return streamStatistics;
11691170
}

hadoop-tools/hadoop-aws/src/main/java/org/apache/hadoop/fs/s3a/prefetch/S3APrefetchingInputStream.java

Lines changed: 31 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626

2727
import org.apache.hadoop.classification.InterfaceAudience;
2828
import org.apache.hadoop.classification.InterfaceStability;
29+
import org.apache.hadoop.classification.VisibleForTesting;
2930
import org.apache.hadoop.fs.CanSetReadahead;
3031
import org.apache.hadoop.fs.FSExceptionMessages;
3132
import org.apache.hadoop.fs.FSInputStream;
@@ -56,6 +57,21 @@ public class S3APrefetchingInputStream
5657
*/
5758
private S3ARemoteInputStream inputStream;
5859

60+
/**
61+
* To be only used by synchronized getPos().
62+
*/
63+
private long lastReadCurrentPos = 0;
64+
65+
/**
66+
* To be only used by getIOStatistics().
67+
*/
68+
private IOStatistics ioStatistics = null;
69+
70+
/**
71+
* To be only used by getS3AStreamStatistics().
72+
*/
73+
private S3AInputStreamStatistics inputStreamStatistics = null;
74+
5975
/**
6076
* Initializes a new instance of the {@code S3APrefetchingInputStream} class.
6177
*
@@ -115,14 +131,20 @@ public synchronized int available() throws IOException {
115131
}
116132

117133
/**
118-
* Gets the current position.
134+
* Gets the current position. If the underlying S3 input stream is closed,
135+
* it returns last read current position from the underlying steam. If the
136+
* current position was never read and the underlying input stream is closed,
137+
* this would return 0.
119138
*
120139
* @return the current position.
121140
* @throws IOException if there is an IO error during this operation.
122141
*/
123142
@Override
124143
public synchronized long getPos() throws IOException {
125-
return isClosed() ? 0 : inputStream.getPos();
144+
if (!isClosed()) {
145+
lastReadCurrentPos = inputStream.getPos();
146+
}
147+
return lastReadCurrentPos;
126148
}
127149

128150
/**
@@ -215,11 +237,12 @@ public boolean hasCapability(String capability) {
215237
*/
216238
@InterfaceAudience.Private
217239
@InterfaceStability.Unstable
240+
@VisibleForTesting
218241
public S3AInputStreamStatistics getS3AStreamStatistics() {
219-
if (isClosed()) {
220-
return null;
242+
if (!isClosed()) {
243+
inputStreamStatistics = inputStream.getS3AStreamStatistics();
221244
}
222-
return inputStream.getS3AStreamStatistics();
245+
return inputStreamStatistics;
223246
}
224247

225248
/**
@@ -229,10 +252,10 @@ public S3AInputStreamStatistics getS3AStreamStatistics() {
229252
*/
230253
@Override
231254
public IOStatistics getIOStatistics() {
232-
if (isClosed()) {
233-
return null;
255+
if (!isClosed()) {
256+
ioStatistics = inputStream.getIOStatistics();
234257
}
235-
return inputStream.getIOStatistics();
258+
return ioStatistics;
236259
}
237260

238261
protected boolean isClosed() {
@@ -249,7 +272,6 @@ protected void throwIfClosed() throws IOException {
249272

250273
@Override
251274
public boolean seekToNewSource(long targetPos) throws IOException {
252-
throwIfClosed();
253275
return false;
254276
}
255277

hadoop-tools/hadoop-aws/src/test/java/org/apache/hadoop/fs/s3a/ITestS3APrefetchingInputStream.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,8 @@
3131
import org.apache.hadoop.fs.Path;
3232
import org.apache.hadoop.fs.contract.ContractTestUtils;
3333
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
34+
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
35+
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
3436
import org.apache.hadoop.fs.statistics.IOStatistics;
3537

3638
import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
@@ -240,4 +242,56 @@ public void testRandomReadSmallFile() throws Throwable {
240242
}
241243
}
242244

245+
@Test
246+
public void testStatusProbesAfterClosingStream() throws Throwable {
247+
describe("When the underlying input stream is closed, the prefetch input stream"
248+
+ " should still support some status probes");
249+
250+
byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
251+
Path smallFile = path("testStatusProbesAfterClosingStream");
252+
ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);
253+
254+
FSDataInputStream in = getFileSystem().open(smallFile);
255+
256+
byte[] buffer = new byte[SMALL_FILE_SIZE];
257+
in.read(buffer, 0, S_1K * 4);
258+
in.seek(S_1K * 12);
259+
in.read(buffer, 0, S_1K * 4);
260+
261+
long pos = in.getPos();
262+
IOStatistics ioStats = in.getIOStatistics();
263+
S3AInputStreamStatistics inputStreamStatistics =
264+
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
265+
266+
assertNotNull("Prefetching input IO stats should not be null", ioStats);
267+
assertNotNull("Prefetching input stream stats should not be null", inputStreamStatistics);
268+
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
269+
pos);
270+
271+
in.close();
272+
273+
// status probes after closing the input stream
274+
long newPos = in.getPos();
275+
IOStatistics newIoStats = in.getIOStatistics();
276+
S3AInputStreamStatistics newInputStreamStatistics =
277+
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();
278+
279+
assertNotNull("Prefetching input IO stats should not be null", newIoStats);
280+
assertNotNull("Prefetching input stream stats should not be null", newInputStreamStatistics);
281+
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
282+
newPos);
283+
284+
// compare status probes after closing of the stream with status probes done before
285+
// closing the stream
286+
assertEquals("Position retrieved through stream before and after closing should match", pos,
287+
newPos);
288+
assertEquals("IO stats retrieved through stream before and after closing should match", ioStats,
289+
newIoStats);
290+
assertEquals("Stream stats retrieved through stream before and after closing should match",
291+
inputStreamStatistics, newInputStreamStatistics);
292+
293+
assertFalse("Not supported with prefetch", in.seekToNewSource(10));
294+
295+
}
296+
243297
}

0 commit comments

Comments
 (0)