Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1164,6 +1164,7 @@ private void checkIfVectoredIOStopped() throws InterruptedIOException {
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@VisibleForTesting
public S3AInputStreamStatistics getS3AStreamStatistics() {
return streamStatistics;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@

import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.fs.CanSetReadahead;
import org.apache.hadoop.fs.FSExceptionMessages;
import org.apache.hadoop.fs.FSInputStream;
Expand Down Expand Up @@ -56,6 +57,21 @@ public class S3APrefetchingInputStream
*/
private S3ARemoteInputStream inputStream;

/**
* To be only used by synchronized getPos().
*/
private long lastReadCurrentPos = 0;

/**
* To be only used by getIOStatistics().
*/
private IOStatistics ioStatistics = null;

/**
* To be only used by getS3AStreamStatistics().
*/
private S3AInputStreamStatistics inputStreamStatistics = null;

/**
* Initializes a new instance of the {@code S3APrefetchingInputStream} class.
*
Expand Down Expand Up @@ -115,14 +131,20 @@ public synchronized int available() throws IOException {
}

/**
* Gets the current position.
* Gets the current position. If the underlying S3 input stream is closed,
* it returns last read current position from the underlying steam. If the
* current position was never read and the underlying input stream is closed,
* this would return 0.
*
* @return the current position.
* @throws IOException if there is an IO error during this operation.
*/
@Override
public synchronized long getPos() throws IOException {
return isClosed() ? 0 : inputStream.getPos();
if (!isClosed()) {
lastReadCurrentPos = inputStream.getPos();
}
return lastReadCurrentPos;
}

/**
Expand Down Expand Up @@ -215,11 +237,12 @@ public boolean hasCapability(String capability) {
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
@VisibleForTesting
public S3AInputStreamStatistics getS3AStreamStatistics() {
if (isClosed()) {
return null;
if (!isClosed()) {
inputStreamStatistics = inputStream.getS3AStreamStatistics();
}
return inputStream.getS3AStreamStatistics();
return inputStreamStatistics;
}

/**
Expand All @@ -229,10 +252,10 @@ public S3AInputStreamStatistics getS3AStreamStatistics() {
*/
@Override
public IOStatistics getIOStatistics() {
if (isClosed()) {
return null;
if (!isClosed()) {
ioStatistics = inputStream.getIOStatistics();
}
return inputStream.getIOStatistics();
return ioStatistics;
}

protected boolean isClosed() {
Expand All @@ -249,7 +272,6 @@ protected void throwIfClosed() throws IOException {

@Override
public boolean seekToNewSource(long targetPos) throws IOException {
throwIfClosed();
return false;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.contract.ContractTestUtils;
import org.apache.hadoop.fs.s3a.performance.AbstractS3ACostTest;
import org.apache.hadoop.fs.s3a.prefetch.S3APrefetchingInputStream;
import org.apache.hadoop.fs.s3a.statistics.S3AInputStreamStatistics;
import org.apache.hadoop.fs.statistics.IOStatistics;

import static org.apache.hadoop.fs.s3a.Constants.PREFETCH_BLOCK_DEFAULT_SIZE;
Expand Down Expand Up @@ -240,4 +242,56 @@ public void testRandomReadSmallFile() throws Throwable {
}
}

@Test
public void testStatusProbesAfterClosingStream() throws Throwable {
describe("When the underlying input stream is closed, the prefetch input stream"
+ " should still support some status probes");

byte[] data = ContractTestUtils.dataset(SMALL_FILE_SIZE, 'a', 26);
Path smallFile = methodPath();
ContractTestUtils.writeDataset(getFileSystem(), smallFile, data, data.length, 16, true);

FSDataInputStream in = getFileSystem().open(smallFile);

byte[] buffer = new byte[SMALL_FILE_SIZE];
in.read(buffer, 0, S_1K * 4);
in.seek(S_1K * 12);
in.read(buffer, 0, S_1K * 4);

long pos = in.getPos();
IOStatistics ioStats = in.getIOStatistics();
S3AInputStreamStatistics inputStreamStatistics =
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();

assertNotNull("Prefetching input IO stats should not be null", ioStats);
assertNotNull("Prefetching input stream stats should not be null", inputStreamStatistics);
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
pos);

in.close();

// status probes after closing the input stream
long newPos = in.getPos();
IOStatistics newIoStats = in.getIOStatistics();
S3AInputStreamStatistics newInputStreamStatistics =
((S3APrefetchingInputStream) (in.getWrappedStream())).getS3AStreamStatistics();

assertNotNull("Prefetching input IO stats should not be null", newIoStats);
assertNotNull("Prefetching input stream stats should not be null", newInputStreamStatistics);
assertNotEquals("Position retrieved from prefetching input stream should be greater than 0", 0,
newPos);

// compare status probes after closing of the stream with status probes done before
// closing the stream
assertEquals("Position retrieved through stream before and after closing should match", pos,
newPos);
assertEquals("IO stats retrieved through stream before and after closing should match", ioStats,
newIoStats);
assertEquals("Stream stats retrieved through stream before and after closing should match",
inputStreamStatistics, newInputStreamStatistics);

assertFalse("seekToNewSource() not supported with prefetch", in.seekToNewSource(10));

}

}