From 157fc1066aad797f0d050cc4fc02ab7c8b3cbf91 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Mon, 26 Jun 2023 13:48:11 -0700 Subject: [PATCH 01/26] Rename DecodedStreamBuffer to UnderlyingStreamBuffer --- .../core/internal/io/AwsChunkedEncodingInputStream.java | 4 ++-- ...DecodedStreamBuffer.java => UnderlyingStreamBuffer.java} | 6 +++--- 2 files changed, 5 insertions(+), 5 deletions(-) rename core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/{DecodedStreamBuffer.java => UnderlyingStreamBuffer.java} (93%) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java index f382bd5ced40..79198f977f26 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java @@ -63,7 +63,7 @@ public abstract class AwsChunkedEncodingInputStream extends SdkInputStream { * Null if the wrapped stream is marksupported, * otherwise it will be initialized when this wrapper is marked. */ - private DecodedStreamBuffer decodedStreamBuffer; + private UnderlyingStreamBuffer decodedStreamBuffer; private boolean isAtStart = true; private boolean isTerminating = false; @@ -256,7 +256,7 @@ public void mark(int readlimit) { } else { log.debug(() -> "AwsChunkedEncodingInputStream marked at the start of the stream " + "(initializing the buffer since the wrapped stream is not mark-supported)."); - decodedStreamBuffer = new DecodedStreamBuffer(maxBufferSize); + decodedStreamBuffer = new UnderlyingStreamBuffer(maxBufferSize); } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/DecodedStreamBuffer.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/UnderlyingStreamBuffer.java similarity index 93% rename from core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/DecodedStreamBuffer.java rename to core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/UnderlyingStreamBuffer.java index f6d3c47c0c1e..6fc086983fda 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/DecodedStreamBuffer.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/UnderlyingStreamBuffer.java @@ -20,8 +20,8 @@ import software.amazon.awssdk.utils.Logger; @SdkInternalApi -class DecodedStreamBuffer { - private static final Logger log = Logger.loggerFor(DecodedStreamBuffer.class); +class UnderlyingStreamBuffer { + private static final Logger log = Logger.loggerFor(UnderlyingStreamBuffer.class); private byte[] bufferArray; private int maxBufferSize; @@ -29,7 +29,7 @@ class DecodedStreamBuffer { private int pos = -1; private boolean bufferSizeOverflow; - DecodedStreamBuffer(int maxBufferSize) { + UnderlyingStreamBuffer(int maxBufferSize) { bufferArray = new byte[maxBufferSize]; this.maxBufferSize = maxBufferSize; } From 29380ab376411569fb2fd3a79092d7527a523816 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Mon, 26 Jun 2023 13:36:18 -0700 Subject: [PATCH 02/26] Update Compressor --- .../awssdk/core/compression/Compressor.java | 15 +++++++++-- .../internal/compression/GzipCompressor.java | 25 ++++++++++++++++--- 2 files changed, 34 insertions(+), 6 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/Compressor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/Compressor.java index 04424842905c..ecd64a4316e1 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/Compressor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/Compressor.java @@ -19,6 +19,7 @@ import java.nio.ByteBuffer; import org.reactivestreams.Publisher; import software.amazon.awssdk.annotations.SdkPublicApi; +import software.amazon.awssdk.core.async.AsyncRequestBody; import software.amazon.awssdk.core.internal.compression.GzipCompressor; import software.amazon.awssdk.core.internal.interceptor.RequestCompressionInterceptor; @@ -33,15 +34,25 @@ public interface Compressor { */ String compressorType(); + /** + * Compress a byte array. + */ + byte[] compress(byte[] content); + /** * Compress an {@link InputStream}. */ InputStream compress(InputStream inputStream); /** - * Compress an async stream. + * Compress a {@link ByteBuffer}. + */ + ByteBuffer compress(ByteBuffer byteBuffer); + + /** + * Compress an {@link AsyncRequestBody}. */ - Publisher compressAsyncStream(Publisher publisher); + Publisher compressAsyncRequestBody(Publisher publisher); /** * Maps the {@link CompressionType} to its corresponding {@link Compressor}. diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/compression/GzipCompressor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/compression/GzipCompressor.java index 62ff0e39fdc9..5ebe3c941d0b 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/compression/GzipCompressor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/compression/GzipCompressor.java @@ -38,22 +38,39 @@ public String compressorType() { } @Override - public InputStream compress(InputStream inputStream) { + public byte[] compress(byte[] content) { try { - byte[] content = IoUtils.toByteArray(inputStream); ByteArrayOutputStream compressedOutputStream = new ByteArrayOutputStream(); GZIPOutputStream gzipOutputStream = new GZIPOutputStream(compressedOutputStream); gzipOutputStream.write(content); gzipOutputStream.close(); + return compressedOutputStream.toByteArray(); + } catch (IOException e) { + throw SdkClientException.create(e.getMessage(), e); + } + } - return new ByteArrayInputStream(compressedOutputStream.toByteArray()); + @Override + public InputStream compress(InputStream inputStream) { + try { + byte[] content = IoUtils.toByteArray(inputStream); + byte[] compressedContent = compress(content); + return new ByteArrayInputStream(compressedContent); } catch (IOException e) { throw SdkClientException.create(e.getMessage(), e); } } @Override - public Publisher compressAsyncStream(Publisher publisher) { + public ByteBuffer compress(ByteBuffer byteBuffer) { + byte[] content = new byte[byteBuffer.remaining()]; + byteBuffer.get(content); + byte[] compressedContent = compress(content); + return ByteBuffer.wrap(compressedContent); + } + + @Override + public Publisher compressAsyncRequestBody(Publisher publisher) { //TODO throw new UnsupportedOperationException(); } From 228a03578cbf359ea7d4c5d483902186d6cc17a8 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Mon, 26 Jun 2023 13:41:04 -0700 Subject: [PATCH 03/26] Sync streaming compression --- .../RequestCompressionInterceptor.java | 30 +- .../io/AwsCompressionInputStream.java | 267 ++++++++++++++++++ 2 files changed, 294 insertions(+), 3 deletions(-) create mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/RequestCompressionInterceptor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/RequestCompressionInterceptor.java index 7b744726241e..caaff7c0ccad 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/RequestCompressionInterceptor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/RequestCompressionInterceptor.java @@ -32,6 +32,7 @@ import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute; import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute; +import software.amazon.awssdk.core.internal.io.AwsCompressionInputStream; import software.amazon.awssdk.core.sync.RequestBody; import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.SdkHttpFullRequest; @@ -85,8 +86,19 @@ public Optional modifyHttpContent(Context.ModifyHttpRequest context Compressor compressor = resolveCompressionType(executionAttributes); RequestBody requestBody = context.requestBody().get(); - // TODO - Sync streaming compression implementation - throw new UnsupportedOperationException(); + if (isTransferEncodingChunked(context)) { + InputStream compressedStream = compressor.compress(requestBody.contentStreamProvider().newStream()); + try { + byte[] compressedBytes = IoUtils.toByteArray(compressedStream); + return Optional.of(RequestBody.fromBytes(compressedBytes)); + } catch(IOException e){ + throw SdkClientException.create(e.getMessage(), e); + } + } + + CompressionContentStreamProvider streamProvider = + new CompressionContentStreamProvider(requestBody.contentStreamProvider(), compressor); + return Optional.of(RequestBody.fromContentProvider(streamProvider, requestBody.contentType())); } @Override @@ -143,6 +155,15 @@ private static SdkHttpRequest updateContentLengthHeader(SdkHttpRequest sdkHttpRe } } + private boolean isTransferEncodingChunked(Context.ModifyHttpRequest context) { + SdkHttpRequest sdkHttpRequest = context.httpRequest(); + Optional transferEncodingHeader = sdkHttpRequest.firstMatchingHeader("Transfer-Encoding"); + if (transferEncodingHeader.isPresent() && transferEncodingHeader.get().equals("chunked")) { + return true; + } + return false; + } + private static Compressor resolveCompressionType(ExecutionAttributes executionAttributes) { List encodings = executionAttributes.getAttribute(SdkInternalExecutionAttribute.REQUEST_COMPRESSION).getEncodings(); @@ -258,7 +279,10 @@ static final class CompressionContentStreamProvider implements ContentStreamProv @Override public InputStream newStream() { closeCurrentStream(); - currentStream = compressor.compress(underlyingInputStreamProvider.newStream()); + currentStream = AwsCompressionInputStream.builder() + .inputStream(underlyingInputStreamProvider.newStream()) + .compressor(compressor) + .build(); return currentStream; } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java new file mode 100644 index 000000000000..06b8ca47cd15 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java @@ -0,0 +1,267 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.io; + +import java.io.IOException; +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.compression.Compressor; +import software.amazon.awssdk.core.exception.SdkClientException; +import software.amazon.awssdk.core.io.SdkInputStream; +import software.amazon.awssdk.utils.Logger; +import software.amazon.awssdk.utils.Validate; + +/** + * A wrapper class of InputStream that implements compression in chunks. + */ +@SdkInternalApi +public class AwsCompressionInputStream extends SdkInputStream { + + public static final int COMPRESSION_CHUNK_SIZE = 128 * 1024; + public static final int COMPRESSION_BUFFER_SIZE = 256 * 1024; + private static final String CRLF = "\r\n"; + private static final Logger log = Logger.loggerFor(AwsCompressionInputStream.class); + private Compressor compressor; + private InputStream is; + private ChunkContentIterator currentChunkIterator; + private UnderlyingStreamBuffer uncompressedStreamBuffer; + private boolean isAtStart = true; + private boolean isTerminating = false; + + private AwsCompressionInputStream(InputStream in, Compressor compressor) { + this.compressor = compressor; + if (in instanceof AwsCompressionInputStream) { + // This could happen when the request is retried. + AwsCompressionInputStream originalCompressionStream = (AwsCompressionInputStream) in; + is = originalCompressionStream.is; + uncompressedStreamBuffer = originalCompressionStream.uncompressedStreamBuffer; + } else { + this.is = in; + uncompressedStreamBuffer = null; + } + } + + public static Builder builder() { + return new Builder(); + } + + @Override + public int read() throws IOException { + byte[] tmp = new byte[1]; + int count = read(tmp, 0, 1); + if (count > 0) { + log.debug(() -> "One byte read from the stream."); + int unsignedByte = (int) tmp[0] & 0xFF; + return unsignedByte; + } else { + return count; + } + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + abortIfNeeded(); + Validate.notNull(b, "buff"); + if (off < 0 || len < 0 || len > b.length - off) { + throw new IndexOutOfBoundsException(); + } else if (len == 0) { + return 0; + } + + if (currentChunkIterator == null || !currentChunkIterator.hasNext()) { + if (isTerminating) { + return -1; + } + isTerminating = setUpNextChunk(); + } + + int count = currentChunkIterator.read(b, off, len); + if (count > 0) { + isAtStart = false; + log.trace(() -> count + " byte read from the stream."); + } + return count; + } + + private boolean setUpNextChunk() throws IOException { + byte[] chunkData = new byte[COMPRESSION_CHUNK_SIZE]; + int chunkSizeInBytes = 0; + while (chunkSizeInBytes < COMPRESSION_CHUNK_SIZE) { + /** Read from the buffer of the uncompressed stream */ + if (uncompressedStreamBuffer != null && uncompressedStreamBuffer.hasNext()) { + chunkData[chunkSizeInBytes++] = uncompressedStreamBuffer.next(); + } else { /** Read from the wrapped stream */ + int bytesToRead = COMPRESSION_CHUNK_SIZE - chunkSizeInBytes; + int count = is.read(chunkData, chunkSizeInBytes, bytesToRead); + if (count != -1) { + if (uncompressedStreamBuffer != null) { + uncompressedStreamBuffer.buffer(chunkData, chunkSizeInBytes, count); + } + chunkSizeInBytes += count; + } else { + break; + } + } + } + if (chunkSizeInBytes == 0) { + byte[] finalChunk = createFinalChunk(); + currentChunkIterator = new ChunkContentIterator(finalChunk); + return true; + } else { + if (chunkSizeInBytes < chunkData.length) { + chunkData = Arrays.copyOf(chunkData, chunkSizeInBytes); + } + // Compress the chunk + byte[] compressedChunkData = compressor.compress(chunkData); + byte[] chunkContent = createChunk(compressedChunkData); + currentChunkIterator = new ChunkContentIterator(chunkContent); + return false; + } + } + + private byte[] createChunk(byte[] compressedChunkData) { + StringBuilder chunkHeader = new StringBuilder(); + chunkHeader.append(Integer.toHexString(compressedChunkData.length)); + chunkHeader.append(CRLF); + try { + byte[] header = chunkHeader.toString().getBytes(StandardCharsets.UTF_8); + byte[] trailer = CRLF.getBytes(StandardCharsets.UTF_8); + byte[] chunk = new byte[header.length + compressedChunkData.length + trailer.length]; + System.arraycopy(header, 0, chunk, 0, header.length); + System.arraycopy(compressedChunkData, 0, chunk, header.length, compressedChunkData.length); + System.arraycopy(trailer, 0, + chunk, header.length + compressedChunkData.length, + trailer.length); + return chunk; + } catch (Exception e) { + throw SdkClientException.builder() + .message("Unable to create chunked data. " + e.getMessage()) + .cause(e) + .build(); + } + } + + private byte[] createFinalChunk() { + byte[] finalChunk = new byte[0]; + StringBuilder chunkHeader = new StringBuilder(); + // chunk-size + chunkHeader.append(Integer.toHexString(finalChunk.length)); + chunkHeader.append(CRLF); + return chunkHeader.toString().getBytes(StandardCharsets.UTF_8); + } + + @Override + protected InputStream getWrappedInputStream() { + return is; + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0) { + return 0; + } + long remaining = n; + int toskip = (int) Math.min(COMPRESSION_BUFFER_SIZE, n); + byte[] temp = new byte[toskip]; + while (remaining > 0) { + int count = read(temp, 0, toskip); + if (count < 0) { + break; + } + remaining -= count; + } + return n - remaining; + } + + /** + * @see java.io.InputStream#markSupported() + */ + @Override + public boolean markSupported() { + return true; + } + + /** + * The readlimit parameter is ignored. + */ + @Override + public void mark(int readlimit) { + abortIfNeeded(); + if (!isAtStart) { + throw new UnsupportedOperationException("Compression stream only supports mark() at the start of the stream."); + } + if (is.markSupported()) { + log.debug(() -> "AwsCompressionInputStream marked at the start of the stream " + + "(will directly mark the wrapped stream since it's mark-supported)."); + is.mark(readlimit); + } else { + log.debug(() -> "AwsCompressionInputStream marked at the start of the stream " + + "(initializing the buffer since the wrapped stream is not mark-supported)."); + uncompressedStreamBuffer = new UnderlyingStreamBuffer(COMPRESSION_BUFFER_SIZE); + } + } + + /** + * Reset the stream, either by resetting the wrapped stream or using the + * buffer created by this class. + */ + @Override + public void reset() throws IOException { + abortIfNeeded(); + // Clear up any encoded data + currentChunkIterator = null; + // Reset the wrapped stream if it is mark-supported, + // otherwise use our buffered data. + if (is.markSupported()) { + log.debug(() -> "AwsCompressionInputStream reset " + + "(will reset the wrapped stream because it is mark-supported)."); + is.reset(); + } else { + log.debug(() -> "AwsCompressionInputStream reset (will use the buffer of the decoded stream)."); + Validate.notNull(uncompressedStreamBuffer, "Cannot reset the stream because the mark is not set."); + uncompressedStreamBuffer.startReadBuffer(); + } + isAtStart = true; + isTerminating = false; + } + + @Override + public void close() throws IOException { + is.close(); + } + + public static final class Builder { + InputStream inputStream; + Compressor compressor; + + public AwsCompressionInputStream build() { + return new AwsCompressionInputStream( + this.inputStream, this.compressor); + } + + public Builder inputStream(InputStream inputStream) { + this.inputStream = inputStream; + return this; + } + + public Builder compressor(Compressor compressor) { + this.compressor = compressor; + return this; + } + } +} From 280bd38e30e16101d192a59612d9cf6b9d569e99 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Mon, 26 Jun 2023 14:26:52 -0700 Subject: [PATCH 04/26] Fix Checkstyle issues --- .../internal/interceptor/RequestCompressionInterceptor.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/RequestCompressionInterceptor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/RequestCompressionInterceptor.java index caaff7c0ccad..a7da8712b5b3 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/RequestCompressionInterceptor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/RequestCompressionInterceptor.java @@ -91,7 +91,7 @@ public Optional modifyHttpContent(Context.ModifyHttpRequest context try { byte[] compressedBytes = IoUtils.toByteArray(compressedStream); return Optional.of(RequestBody.fromBytes(compressedBytes)); - } catch(IOException e){ + } catch (IOException e) { throw SdkClientException.create(e.getMessage(), e); } } @@ -108,8 +108,8 @@ public Optional modifyAsyncHttpContent(Context.ModifyHttpReque return context.asyncRequestBody(); } - AsyncRequestBody asyncRequestBody = context.asyncRequestBody().get(); - Compressor compressor = resolveCompressionType(executionAttributes); + //AsyncRequestBody asyncRequestBody = context.asyncRequestBody().get(); + //Compressor compressor = resolveCompressionType(executionAttributes); // TODO - Async streaming compression implementation throw new UnsupportedOperationException(); From e2d69519b887b9126af92002c58760277366c0f2 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Wed, 28 Jun 2023 14:10:18 -0700 Subject: [PATCH 05/26] Refactor to common class AwsChunkedInputStream --- .../AwsSignedChunkedEncodingInputStream.java | 1 - .../SyncHttpChecksumInTrailerInterceptor.java | 6 +- .../io/AwsChunkedEncodingInputStream.java | 107 ++--------------- .../internal/io/AwsChunkedInputStream.java | 109 ++++++++++++++++++ .../io/AwsCompressionInputStream.java | 96 +++------------ 5 files changed, 139 insertions(+), 180 deletions(-) create mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedInputStream.java diff --git a/core/auth/src/main/java/software/amazon/awssdk/auth/signer/internal/chunkedencoding/AwsSignedChunkedEncodingInputStream.java b/core/auth/src/main/java/software/amazon/awssdk/auth/signer/internal/chunkedencoding/AwsSignedChunkedEncodingInputStream.java index 636fad74f9fc..3174eb7c6caa 100644 --- a/core/auth/src/main/java/software/amazon/awssdk/auth/signer/internal/chunkedencoding/AwsSignedChunkedEncodingInputStream.java +++ b/core/auth/src/main/java/software/amazon/awssdk/auth/signer/internal/chunkedencoding/AwsSignedChunkedEncodingInputStream.java @@ -40,7 +40,6 @@ @SdkInternalApi public final class AwsSignedChunkedEncodingInputStream extends AwsChunkedEncodingInputStream { - private static final String CRLF = "\r\n"; private static final String CHUNK_SIGNATURE_HEADER = ";chunk-signature="; private static final String CHECKSUM_SIGNATURE_HEADER = "x-amz-trailer-signature:"; private String previousChunkSignature; diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java index d459a467f3fe..370ed9478bcf 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/interceptor/SyncHttpChecksumInTrailerInterceptor.java @@ -29,7 +29,7 @@ import software.amazon.awssdk.core.interceptor.Context; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.ExecutionInterceptor; -import software.amazon.awssdk.core.internal.io.AwsChunkedEncodingInputStream; +import software.amazon.awssdk.core.internal.io.AwsChunkedInputStream; import software.amazon.awssdk.core.internal.io.AwsUnsignedChunkedEncodingInputStream; import software.amazon.awssdk.core.internal.util.HttpChecksumResolver; import software.amazon.awssdk.core.internal.util.HttpChecksumUtils; @@ -74,7 +74,7 @@ public Optional modifyHttpContent(Context.ModifyHttpRequest context RequestBody.fromContentProvider( streamProvider, AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength( - requestBody.optionalContentLength().orElse(0L), AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE) + requestBody.optionalContentLength().orElse(0L), AwsChunkedInputStream.DEFAULT_CHUNK_SIZE) + checksumContentLength, requestBody.contentType())); } @@ -113,7 +113,7 @@ public SdkHttpRequest modifyHttpRequest(Context.ModifyHttpRequest context, Execu .putHeader("x-amz-decoded-content-length", Long.toString(originalContentLength)) .putHeader(CONTENT_LENGTH, Long.toString(AwsUnsignedChunkedEncodingInputStream.calculateStreamContentLength( - originalContentLength, AwsChunkedEncodingInputStream.DEFAULT_CHUNK_SIZE) + originalContentLength, AwsChunkedInputStream.DEFAULT_CHUNK_SIZE) + checksumContentLength))); } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java index 79198f977f26..c869179d488f 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedEncodingInputStream.java @@ -22,8 +22,6 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.checksums.SdkChecksum; import software.amazon.awssdk.core.internal.chunked.AwsChunkedEncodingConfig; -import software.amazon.awssdk.core.io.SdkInputStream; -import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; /** @@ -37,37 +35,16 @@ * the wrapped stream. */ @SdkInternalApi -public abstract class AwsChunkedEncodingInputStream extends SdkInputStream { +public abstract class AwsChunkedEncodingInputStream extends AwsChunkedInputStream { - public static final int DEFAULT_CHUNK_SIZE = 128 * 1024; - protected static final int SKIP_BUFFER_SIZE = 256 * 1024; - protected static final String CRLF = "\r\n"; - protected static final byte[] FINAL_CHUNK = new byte[0]; protected static final String HEADER_COLON_SEPARATOR = ":"; - private static final Logger log = Logger.loggerFor(AwsChunkedEncodingInputStream.class); protected byte[] calculatedChecksum = null; protected final String checksumHeaderForTrailer; protected boolean isTrailingTerminated = true; - private InputStream is = null; private final int chunkSize; private final int maxBufferSize; private final SdkChecksum sdkChecksum; private boolean isLastTrailingCrlf; - /** - * Iterator on the current chunk. - */ - private ChunkContentIterator currentChunkIterator; - - /** - * Iterator on the buffer of the decoded stream, - * Null if the wrapped stream is marksupported, - * otherwise it will be initialized when this wrapper is marked. - */ - private UnderlyingStreamBuffer decodedStreamBuffer; - - private boolean isAtStart = true; - private boolean isTerminating = false; - /** * Creates a chunked encoding input stream initialized with the originating stream. The configuration allows @@ -89,10 +66,10 @@ protected AwsChunkedEncodingInputStream(InputStream in, AwsChunkedEncodingInputStream originalChunkedStream = (AwsChunkedEncodingInputStream) in; providedMaxBufferSize = Math.max(originalChunkedStream.maxBufferSize, providedMaxBufferSize); is = originalChunkedStream.is; - decodedStreamBuffer = originalChunkedStream.decodedStreamBuffer; + underlyingStreamBuffer = originalChunkedStream.underlyingStreamBuffer; } else { is = in; - decodedStreamBuffer = null; + underlyingStreamBuffer = null; } this.chunkSize = awsChunkedEncodingConfig.chunkSize(); this.maxBufferSize = providedMaxBufferSize; @@ -153,19 +130,6 @@ public T checksumHeaderForTrailer(String checksumHeaderForTrailer) { } - @Override - public int read() throws IOException { - byte[] tmp = new byte[1]; - int count = read(tmp, 0, 1); - if (count > 0) { - log.debug(() -> "One byte read from the stream."); - int unsignedByte = (int) tmp[0] & 0xFF; - return unsignedByte; - } else { - return count; - } - } - @Override public int read(byte[] b, int off, int len) throws IOException { abortIfNeeded(); @@ -211,32 +175,6 @@ private boolean setUpTrailingChunks() { return true; } - @Override - public long skip(long n) throws IOException { - if (n <= 0) { - return 0; - } - long remaining = n; - int toskip = (int) Math.min(SKIP_BUFFER_SIZE, n); - byte[] temp = new byte[toskip]; - while (remaining > 0) { - int count = read(temp, 0, toskip); - if (count < 0) { - break; - } - remaining -= count; - } - return n - remaining; - } - - /** - * @see java.io.InputStream#markSupported() - */ - @Override - public boolean markSupported() { - return true; - } - /** * The readlimit parameter is ignored. */ @@ -256,7 +194,7 @@ public void mark(int readlimit) { } else { log.debug(() -> "AwsChunkedEncodingInputStream marked at the start of the stream " + "(initializing the buffer since the wrapped stream is not mark-supported)."); - decodedStreamBuffer = new UnderlyingStreamBuffer(maxBufferSize); + underlyingStreamBuffer = new UnderlyingStreamBuffer(maxBufferSize); } } @@ -280,8 +218,8 @@ public void reset() throws IOException { is.reset(); } else { log.debug(() -> "AwsChunkedEncodingInputStream reset (will use the buffer of the decoded stream)."); - Validate.notNull(decodedStreamBuffer, "Cannot reset the stream because the mark is not set."); - decodedStreamBuffer.startReadBuffer(); + Validate.notNull(underlyingStreamBuffer, "Cannot reset the stream because the mark is not set."); + underlyingStreamBuffer.startReadBuffer(); } isAtStart = true; isTerminating = false; @@ -298,14 +236,14 @@ private boolean setUpNextChunk() throws IOException { int chunkSizeInBytes = 0; while (chunkSizeInBytes < chunkSize) { /** Read from the buffer of the decoded stream */ - if (null != decodedStreamBuffer && decodedStreamBuffer.hasNext()) { - chunkData[chunkSizeInBytes++] = decodedStreamBuffer.next(); + if (null != underlyingStreamBuffer && underlyingStreamBuffer.hasNext()) { + chunkData[chunkSizeInBytes++] = underlyingStreamBuffer.next(); } else { /** Read from the wrapped stream */ int bytesToRead = chunkSize - chunkSizeInBytes; int count = is.read(chunkData, chunkSizeInBytes, bytesToRead); if (count != -1) { - if (null != decodedStreamBuffer) { - decodedStreamBuffer.buffer(chunkData, chunkSizeInBytes, count); + if (null != underlyingStreamBuffer) { + underlyingStreamBuffer.buffer(chunkData, chunkSizeInBytes, count); } chunkSizeInBytes += count; } else { @@ -333,33 +271,8 @@ private boolean setUpNextChunk() throws IOException { } } - - @Override - protected InputStream getWrappedInputStream() { - return is; - } - - - /** - * The final chunk. - * - * @param finalChunk The last byte which will be often 0 byte. - * @return Final chunk that will be appended with CRLF or any required signatures. - */ - protected abstract byte[] createFinalChunk(byte[] finalChunk); - - /** - * Creates chunk for the given buffer. - * The chucks could be appended with Signatures or any additional bytes by Concrete classes. - * - * @param chunkData The chunk of original data. - * @return Chunked data which will have signature if signed or just data if unsigned. - */ - protected abstract byte[] createChunk(byte[] chunkData); - /** * @return ChecksumChunkHeader in bytes based on the Header name field. */ protected abstract byte[] createChecksumChunkHeader(); - } \ No newline at end of file diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedInputStream.java new file mode 100644 index 000000000000..f30e2c0f8d14 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsChunkedInputStream.java @@ -0,0 +1,109 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.io; + +import java.io.IOException; +import java.io.InputStream; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.io.SdkInputStream; +import software.amazon.awssdk.utils.Logger; + +/** + * A wrapper of InputStream that implements streaming in chunks. + */ +@SdkInternalApi +public abstract class AwsChunkedInputStream extends SdkInputStream { + public static final int DEFAULT_CHUNK_SIZE = 128 * 1024; + protected static final int SKIP_BUFFER_SIZE = 256 * 1024; + protected static final String CRLF = "\r\n"; + protected static final byte[] FINAL_CHUNK = new byte[0]; + protected static final Logger log = Logger.loggerFor(AwsChunkedInputStream.class); + protected InputStream is; + /** + * Iterator on the current chunk. + */ + protected ChunkContentIterator currentChunkIterator; + + /** + * Iterator on the buffer of the underlying stream, + * Null if the wrapped stream is marksupported, + * otherwise it will be initialized when this wrapper is marked. + */ + protected UnderlyingStreamBuffer underlyingStreamBuffer; + protected boolean isAtStart = true; + protected boolean isTerminating = false; + + @Override + public int read() throws IOException { + byte[] tmp = new byte[1]; + int count = read(tmp, 0, 1); + if (count > 0) { + log.debug(() -> "One byte read from the stream."); + int unsignedByte = (int) tmp[0] & 0xFF; + return unsignedByte; + } else { + return count; + } + } + + @Override + public long skip(long n) throws IOException { + if (n <= 0) { + return 0; + } + long remaining = n; + int toskip = (int) Math.min(SKIP_BUFFER_SIZE, n); + byte[] temp = new byte[toskip]; + while (remaining > 0) { + int count = read(temp, 0, toskip); + if (count < 0) { + break; + } + remaining -= count; + } + return n - remaining; + } + + /** + * @see java.io.InputStream#markSupported() + */ + @Override + public boolean markSupported() { + return true; + } + + @Override + protected InputStream getWrappedInputStream() { + return is; + } + + /** + * Creates chunk for the given buffer. + * The chucks could be appended with Signatures or any additional bytes by Concrete classes. + * + * @param chunkData The chunk of original data. + * @return Chunked data which will have signature if signed or just data if unsigned. + */ + protected abstract byte[] createChunk(byte[] chunkData); + + /** + * The final chunk. + * + * @param finalChunk The last byte which will be often 0 byte. + * @return Final chunk that will be appended with CRLF or any required signatures. + */ + protected abstract byte[] createFinalChunk(byte[] finalChunk); +} diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java index 06b8ca47cd15..0e5794d7c372 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java @@ -22,37 +22,25 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.compression.Compressor; import software.amazon.awssdk.core.exception.SdkClientException; -import software.amazon.awssdk.core.io.SdkInputStream; -import software.amazon.awssdk.utils.Logger; import software.amazon.awssdk.utils.Validate; /** * A wrapper class of InputStream that implements compression in chunks. */ @SdkInternalApi -public class AwsCompressionInputStream extends SdkInputStream { - - public static final int COMPRESSION_CHUNK_SIZE = 128 * 1024; - public static final int COMPRESSION_BUFFER_SIZE = 256 * 1024; - private static final String CRLF = "\r\n"; - private static final Logger log = Logger.loggerFor(AwsCompressionInputStream.class); +public class AwsCompressionInputStream extends AwsChunkedInputStream { private Compressor compressor; - private InputStream is; - private ChunkContentIterator currentChunkIterator; - private UnderlyingStreamBuffer uncompressedStreamBuffer; - private boolean isAtStart = true; - private boolean isTerminating = false; private AwsCompressionInputStream(InputStream in, Compressor compressor) { this.compressor = compressor; if (in instanceof AwsCompressionInputStream) { // This could happen when the request is retried. AwsCompressionInputStream originalCompressionStream = (AwsCompressionInputStream) in; - is = originalCompressionStream.is; - uncompressedStreamBuffer = originalCompressionStream.uncompressedStreamBuffer; + this.is = originalCompressionStream.is; + this.underlyingStreamBuffer = originalCompressionStream.underlyingStreamBuffer; } else { this.is = in; - uncompressedStreamBuffer = null; + this.underlyingStreamBuffer = null; } } @@ -60,19 +48,6 @@ public static Builder builder() { return new Builder(); } - @Override - public int read() throws IOException { - byte[] tmp = new byte[1]; - int count = read(tmp, 0, 1); - if (count > 0) { - log.debug(() -> "One byte read from the stream."); - int unsignedByte = (int) tmp[0] & 0xFF; - return unsignedByte; - } else { - return count; - } - } - @Override public int read(byte[] b, int off, int len) throws IOException { abortIfNeeded(); @@ -99,18 +74,18 @@ public int read(byte[] b, int off, int len) throws IOException { } private boolean setUpNextChunk() throws IOException { - byte[] chunkData = new byte[COMPRESSION_CHUNK_SIZE]; + byte[] chunkData = new byte[DEFAULT_CHUNK_SIZE]; int chunkSizeInBytes = 0; - while (chunkSizeInBytes < COMPRESSION_CHUNK_SIZE) { + while (chunkSizeInBytes < DEFAULT_CHUNK_SIZE) { /** Read from the buffer of the uncompressed stream */ - if (uncompressedStreamBuffer != null && uncompressedStreamBuffer.hasNext()) { - chunkData[chunkSizeInBytes++] = uncompressedStreamBuffer.next(); + if (underlyingStreamBuffer != null && underlyingStreamBuffer.hasNext()) { + chunkData[chunkSizeInBytes++] = underlyingStreamBuffer.next(); } else { /** Read from the wrapped stream */ - int bytesToRead = COMPRESSION_CHUNK_SIZE - chunkSizeInBytes; + int bytesToRead = DEFAULT_CHUNK_SIZE - chunkSizeInBytes; int count = is.read(chunkData, chunkSizeInBytes, bytesToRead); if (count != -1) { - if (uncompressedStreamBuffer != null) { - uncompressedStreamBuffer.buffer(chunkData, chunkSizeInBytes, count); + if (underlyingStreamBuffer != null) { + underlyingStreamBuffer.buffer(chunkData, chunkSizeInBytes, count); } chunkSizeInBytes += count; } else { @@ -119,7 +94,7 @@ private boolean setUpNextChunk() throws IOException { } } if (chunkSizeInBytes == 0) { - byte[] finalChunk = createFinalChunk(); + byte[] finalChunk = createFinalChunk(FINAL_CHUNK); currentChunkIterator = new ChunkContentIterator(finalChunk); return true; } else { @@ -134,7 +109,7 @@ private boolean setUpNextChunk() throws IOException { } } - private byte[] createChunk(byte[] compressedChunkData) { + protected byte[] createChunk(byte[] compressedChunkData) { StringBuilder chunkHeader = new StringBuilder(); chunkHeader.append(Integer.toHexString(compressedChunkData.length)); chunkHeader.append(CRLF); @@ -156,8 +131,7 @@ private byte[] createChunk(byte[] compressedChunkData) { } } - private byte[] createFinalChunk() { - byte[] finalChunk = new byte[0]; + protected byte[] createFinalChunk(byte[] finalChunk) { StringBuilder chunkHeader = new StringBuilder(); // chunk-size chunkHeader.append(Integer.toHexString(finalChunk.length)); @@ -165,37 +139,6 @@ private byte[] createFinalChunk() { return chunkHeader.toString().getBytes(StandardCharsets.UTF_8); } - @Override - protected InputStream getWrappedInputStream() { - return is; - } - - @Override - public long skip(long n) throws IOException { - if (n <= 0) { - return 0; - } - long remaining = n; - int toskip = (int) Math.min(COMPRESSION_BUFFER_SIZE, n); - byte[] temp = new byte[toskip]; - while (remaining > 0) { - int count = read(temp, 0, toskip); - if (count < 0) { - break; - } - remaining -= count; - } - return n - remaining; - } - - /** - * @see java.io.InputStream#markSupported() - */ - @Override - public boolean markSupported() { - return true; - } - /** * The readlimit parameter is ignored. */ @@ -212,7 +155,7 @@ public void mark(int readlimit) { } else { log.debug(() -> "AwsCompressionInputStream marked at the start of the stream " + "(initializing the buffer since the wrapped stream is not mark-supported)."); - uncompressedStreamBuffer = new UnderlyingStreamBuffer(COMPRESSION_BUFFER_SIZE); + underlyingStreamBuffer = new UnderlyingStreamBuffer(SKIP_BUFFER_SIZE); } } @@ -233,18 +176,13 @@ public void reset() throws IOException { is.reset(); } else { log.debug(() -> "AwsCompressionInputStream reset (will use the buffer of the decoded stream)."); - Validate.notNull(uncompressedStreamBuffer, "Cannot reset the stream because the mark is not set."); - uncompressedStreamBuffer.startReadBuffer(); + Validate.notNull(underlyingStreamBuffer, "Cannot reset the stream because the mark is not set."); + underlyingStreamBuffer.startReadBuffer(); } isAtStart = true; isTerminating = false; } - @Override - public void close() throws IOException { - is.close(); - } - public static final class Builder { InputStream inputStream; Compressor compressor; From be066475812266b8258aa93565ff5a468b86436c Mon Sep 17 00:00:00 2001 From: hdavidh Date: Wed, 28 Jun 2023 14:15:42 -0700 Subject: [PATCH 06/26] Refactor Compressor --- .../awssdk/core/compression/Compressor.java | 43 ++++++++++----- .../internal/compression/GzipCompressor.java | 52 ++++++------------- 2 files changed, 46 insertions(+), 49 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/Compressor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/Compressor.java index ecd64a4316e1..d6b48bed56a6 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/Compressor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/Compressor.java @@ -17,42 +17,61 @@ import java.io.InputStream; import java.nio.ByteBuffer; -import org.reactivestreams.Publisher; -import software.amazon.awssdk.annotations.SdkPublicApi; -import software.amazon.awssdk.core.async.AsyncRequestBody; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.internal.compression.GzipCompressor; import software.amazon.awssdk.core.internal.interceptor.RequestCompressionInterceptor; /** * Interface for compressors to be used by {@link RequestCompressionInterceptor} to compress requests. */ -@SdkPublicApi +@SdkInternalApi public interface Compressor { /** * The compression algorithm type. + * + * @return The {@link String} compression algorithm type. */ String compressorType(); /** - * Compress a byte array. + * Compress a {@link SdkBytes} payload. + * + * @param content + * @return The compressed {@link SdkBytes}. */ - byte[] compress(byte[] content); + SdkBytes compress(SdkBytes content); /** - * Compress an {@link InputStream}. + * Compress a byte[] payload. + * + * @param content + * @return The compressed byte array. */ - InputStream compress(InputStream inputStream); + default byte[] compress(byte[] content) { + return compress(SdkBytes.fromByteArray(content)).asByteArray(); + } /** - * Compress a {@link ByteBuffer}. + * Compress an {@link InputStream} payload. + * + * @param content + * @return The compressed {@link InputStream}. */ - ByteBuffer compress(ByteBuffer byteBuffer); + default InputStream compress(InputStream content) { + return compress(SdkBytes.fromInputStream(content)).asInputStream(); + } /** - * Compress an {@link AsyncRequestBody}. + * Compress an {@link ByteBuffer} payload. + * + * @param content + * @return The compressed {@link ByteBuffer}. */ - Publisher compressAsyncRequestBody(Publisher publisher); + default ByteBuffer compress(ByteBuffer content) { + return compress(SdkBytes.fromByteBuffer(content)).asByteBuffer(); + } /** * Maps the {@link CompressionType} to its corresponding {@link Compressor}. diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/compression/GzipCompressor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/compression/GzipCompressor.java index 5ebe3c941d0b..6419d7dd4e14 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/compression/GzipCompressor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/compression/GzipCompressor.java @@ -15,22 +15,23 @@ package software.amazon.awssdk.core.internal.compression; -import java.io.ByteArrayInputStream; +import static software.amazon.awssdk.utils.IoUtils.closeQuietly; + import java.io.ByteArrayOutputStream; import java.io.IOException; -import java.io.InputStream; -import java.nio.ByteBuffer; +import java.io.UncheckedIOException; import java.util.zip.GZIPOutputStream; -import org.reactivestreams.Publisher; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.compression.Compressor; -import software.amazon.awssdk.core.exception.SdkClientException; -import software.amazon.awssdk.utils.IoUtils; @SdkInternalApi public final class GzipCompressor implements Compressor { private static final String COMPRESSOR_TYPE = "gzip"; + private static final Logger log = LoggerFactory.getLogger(GzipCompressor.class); @Override public String compressorType() { @@ -38,40 +39,17 @@ public String compressorType() { } @Override - public byte[] compress(byte[] content) { + public SdkBytes compress(SdkBytes content) { + GZIPOutputStream gzipOutputStream = null; try { ByteArrayOutputStream compressedOutputStream = new ByteArrayOutputStream(); - GZIPOutputStream gzipOutputStream = new GZIPOutputStream(compressedOutputStream); - gzipOutputStream.write(content); - gzipOutputStream.close(); - return compressedOutputStream.toByteArray(); - } catch (IOException e) { - throw SdkClientException.create(e.getMessage(), e); - } - } - - @Override - public InputStream compress(InputStream inputStream) { - try { - byte[] content = IoUtils.toByteArray(inputStream); - byte[] compressedContent = compress(content); - return new ByteArrayInputStream(compressedContent); + gzipOutputStream = new GZIPOutputStream(compressedOutputStream); + gzipOutputStream.write(content.asByteArray()); + return SdkBytes.fromByteArray(compressedOutputStream.toByteArray()); } catch (IOException e) { - throw SdkClientException.create(e.getMessage(), e); + throw new UncheckedIOException(e); + } finally { + closeQuietly(gzipOutputStream, log); } } - - @Override - public ByteBuffer compress(ByteBuffer byteBuffer) { - byte[] content = new byte[byteBuffer.remaining()]; - byteBuffer.get(content); - byte[] compressedContent = compress(content); - return ByteBuffer.wrap(compressedContent); - } - - @Override - public Publisher compressAsyncRequestBody(Publisher publisher) { - //TODO - throw new UnsupportedOperationException(); - } } From 5b5641c8849c155ed3589e921f90a796adabdbc9 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Wed, 28 Jun 2023 16:49:52 -0700 Subject: [PATCH 07/26] Refactor CompressionType --- .../core/compression/CompressionType.java | 82 ++++++++++++------- .../awssdk/core/compression/Compressor.java | 16 ---- .../pipeline/stages/CompressRequestStage.java | 12 +-- 3 files changed, 58 insertions(+), 52 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java index 8780e02a9c6a..3fee7f5c14dc 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java @@ -15,60 +15,80 @@ package software.amazon.awssdk.core.compression; -import java.util.EnumSet; +import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import software.amazon.awssdk.annotations.SdkInternalApi; -import software.amazon.awssdk.utils.internal.EnumUtils; +import software.amazon.awssdk.core.internal.compression.GzipCompressor; +import software.amazon.awssdk.utils.Validate; /** * The supported compression algorithms for operations with the requestCompression trait. Each supported algorithm will have an * {@link Compressor} implementation. */ @SdkInternalApi -public enum CompressionType { +public final class CompressionType { - GZIP("gzip"), + public static final CompressionType GZIP = CompressionType.of("gzip"); - UNKNOWN_TO_SDK_VERSION(null); + private static Map compressorMap = new HashMap() {{ + put("gzip", new GzipCompressor()); + }}; - private static final Map VALUE_MAP = EnumUtils.uniqueIndex( - CompressionType.class, CompressionType::toString); + private String id; - private final String value; + private CompressionType(String id) { + this.id = id; + } - CompressionType(String value) { - this.value = value; + /** + * Creates a new {@link CompressionType} of the given value. + */ + public static CompressionType of(String value) { + Validate.paramNotBlank(value, "compressionType"); + return CompressionTypeCache.put(value); } - @Override - public String toString() { - return String.valueOf(value); + /** + * Returns the {@link Set} of {@link String}s of compression types supported by the SDK. + */ + public static Set compressionTypes() { + return compressorMap.keySet(); } /** - * Use this in place of valueOf to convert the raw string into the enum value. - * - * @param value - * real value - * @return SupportedEncodings corresponding to the value + * Whether or not the compression type is supported by the SDK. */ - public static CompressionType fromValue(String value) { - if (value == null) { - return null; - } - return VALUE_MAP.getOrDefault(value, UNKNOWN_TO_SDK_VERSION); + public static boolean isSupported(String compressionType) { + return compressionTypes().contains(compressionType); } /** - * Use this in place of {@link #values()} to return a {@link Set} of all values known to the SDK. This will return - * all known enum values except {@link #UNKNOWN_TO_SDK_VERSION}. - * - * @return a {@link Set} of known {@link CompressionType}s + * Maps the {@link CompressionType} to its corresponding {@link Compressor}. */ - public static Set knownValues() { - Set knownValues = EnumSet.allOf(CompressionType.class); - knownValues.remove(UNKNOWN_TO_SDK_VERSION); - return knownValues; + public Compressor newCompressor() { + Compressor compressor = compressorMap.getOrDefault(this.id, null); + if (compressor == null) { + throw new UnsupportedOperationException("The compression type " + id + " does not have an implementation of " + + "Compressor"); + } + return compressor; + } + + @Override + public String toString() { + return id; + } + + private static class CompressionTypeCache { + private static final ConcurrentHashMap VALUES = new ConcurrentHashMap<>(); + + private CompressionTypeCache() { + } + + private static CompressionType put(String value) { + return VALUES.computeIfAbsent(value, v -> new CompressionType(value)); + } } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/Compressor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/Compressor.java index 1b752a340fea..d0c1ce552034 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/Compressor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/Compressor.java @@ -19,12 +19,10 @@ import java.nio.ByteBuffer; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.SdkBytes; -import software.amazon.awssdk.core.internal.compression.GzipCompressor; import software.amazon.awssdk.core.internal.http.pipeline.stages.CompressRequestStage; /** * Interface for compressors used by {@link CompressRequestStage} to compress requests. - * TODO: this will be refactored in the other PR */ @SdkInternalApi public interface Compressor { @@ -73,18 +71,4 @@ default InputStream compress(InputStream content) { default ByteBuffer compress(ByteBuffer content) { return compress(SdkBytes.fromByteBuffer(content)).asByteBuffer(); } - - /** - * Maps the {@link CompressionType} to its corresponding {@link Compressor}. - * TODO: Update mappings here when additional compressors are supported in the future - */ - static Compressor forCompressorType(CompressionType compressionType) { - switch (compressionType) { - case GZIP: - return new GzipCompressor(); - default: - throw new IllegalArgumentException("The compresssion type " + compressionType + "does not have an implemenation" - + " of Compressor."); - } - } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java index 7050eb10aeb3..1c079ca2b2d0 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java @@ -18,10 +18,13 @@ import java.io.IOException; import java.io.InputStream; import java.io.UncheckedIOException; +import java.util.List; +import java.util.Locale; import java.util.Optional; import java.util.function.Supplier; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.SdkSystemSetting; +import software.amazon.awssdk.core.compression.CompressionType; import software.amazon.awssdk.core.compression.Compressor; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; @@ -126,16 +129,15 @@ private static SdkHttpFullRequest.Builder updateContentLengthHeader(SdkHttpFullR }*/ private static Compressor resolveCompressionType(ExecutionAttributes executionAttributes) { - /*List encodings = + List encodings = executionAttributes.getAttribute(SdkInternalExecutionAttribute.REQUEST_COMPRESSION).getEncodings(); - TODO: will refactor CompressionType in the other PR and update this for (String encoding: encodings) { encoding = encoding.toLowerCase(Locale.ROOT); - if (CompressionType.compressionTypes().contains(encoding)) { - return CompressionType.of(encoding).newCompressor().get(); + if (CompressionType.isSupported(encoding)) { + return CompressionType.of(encoding).newCompressor(); } - }*/ + } return null; } From 7e074249bbd8aebb9943a14e3da89ff1bf8563a4 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Thu, 29 Jun 2023 02:54:31 -0700 Subject: [PATCH 08/26] sync compression --- .../pipeline/stages/CompressRequestStage.java | 79 ++++++++++++++----- 1 file changed, 60 insertions(+), 19 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java index 1c079ca2b2d0..932536989849 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java @@ -32,6 +32,7 @@ import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute; import software.amazon.awssdk.core.internal.http.RequestExecutionContext; import software.amazon.awssdk.core.internal.http.pipeline.MutableRequestToRequestPipeline; +import software.amazon.awssdk.core.internal.io.AwsCompressionInputStream; import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.SdkHttpFullRequest; import software.amazon.awssdk.profiles.ProfileFile; @@ -59,23 +60,27 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ Compressor compressor = resolveCompressionType(context.executionAttributes()); - // non-streaming - if (!isStreaming(context)) { - ContentStreamProvider wrappedProvider = input.contentStreamProvider(); - ContentStreamProvider compressedStreamProvider = () -> compressor.compress(wrappedProvider.newStream()); - input.contentStreamProvider(compressedStreamProvider); - input = updateContentEncodingHeader(input, compressor); - return updateContentLengthHeader(input); + // non-streaming OR transfer-encoding chunked + if (!isStreaming(context) || isTransferEncodingChunked(input)) { + compressEntirePayload(input, compressor); + updateContentEncodingHeader(input, compressor); + if (!isStreaming(context)) { + return updateContentLengthHeader(input); + } else { + return removeContentLengthHeader(input); + } } - /*if (context.requestProvider() == null) { + if (context.requestProvider() == null) { // sync streaming - - } else { + input.contentStreamProvider(new CompressionContentStreamProvider(input.contentStreamProvider(), compressor)); + } + /*else { // async streaming - }*/ - return updateContentEncodingHeader(input, compressor); + updateContentEncodingHeader(input, compressor); + removeContentLengthHeader(input); + return input.putHeader("Transfer-Encoding", "chunked"); } private static boolean shouldCompress(SdkHttpFullRequest.Builder input, RequestExecutionContext context) { @@ -101,12 +106,17 @@ private static boolean isStreaming(RequestExecutionContext context) { return context.executionAttributes().getAttribute(SdkInternalExecutionAttribute.REQUEST_COMPRESSION).isStreaming(); } + private SdkHttpFullRequest.Builder compressEntirePayload(SdkHttpFullRequest.Builder input, Compressor compressor) { + ContentStreamProvider wrappedProvider = input.contentStreamProvider(); + ContentStreamProvider compressedStreamProvider = () -> compressor.compress(wrappedProvider.newStream()); + return input.contentStreamProvider(compressedStreamProvider); + } + private static SdkHttpFullRequest.Builder updateContentEncodingHeader(SdkHttpFullRequest.Builder input, Compressor compressor) { if (input.firstMatchingHeader("Content-encoding").isPresent()) { return input.appendHeader("Content-encoding", compressor.compressorType()); } - return input.putHeader("Content-encoding", compressor.compressorType()); } @@ -121,12 +131,15 @@ private static SdkHttpFullRequest.Builder updateContentLengthHeader(SdkHttpFullR } } - /*private boolean isTransferEncodingChunked(Context.ModifyHttpRequest context) { - return context.httpRequest() - .firstMatchingHeader("Transfer-Encoding") - .map(headerValue -> headerValue.equals("chunked")) - .orElse(false); - }*/ + private static SdkHttpFullRequest.Builder removeContentLengthHeader(SdkHttpFullRequest.Builder input) { + return input.removeHeader("Content-Length"); + } + + private boolean isTransferEncodingChunked(SdkHttpFullRequest.Builder input) { + return input.firstMatchingHeader("Transfer-Encoding") + .map(headerValue -> headerValue.equals("chunked")) + .orElse(false); + } private static Compressor resolveCompressionType(ExecutionAttributes executionAttributes) { List encodings = @@ -226,4 +239,32 @@ private static void validateMinCompressionSizeInput(int minCompressionSize) { + "10485760.", new IllegalArgumentException()); } } + + static final class CompressionContentStreamProvider implements ContentStreamProvider { + private final ContentStreamProvider underlyingInputStreamProvider; + private InputStream currentStream; + private final Compressor compressor; + + CompressionContentStreamProvider(ContentStreamProvider underlyingInputStreamProvider, Compressor compressor) { + this.underlyingInputStreamProvider = underlyingInputStreamProvider; + this.compressor = compressor; + } + + @Override + public InputStream newStream() { + closeCurrentStream(); + currentStream = AwsCompressionInputStream.builder() + .inputStream(underlyingInputStreamProvider.newStream()) + .compressor(compressor) + .build(); + return currentStream; + } + + private void closeCurrentStream() { + if (currentStream != null) { + IoUtils.closeQuietly(currentStream, null); + currentStream = null; + } + } + } } From ba7e60294a091d2add831a1cf6899af598453559 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Thu, 29 Jun 2023 08:46:23 -0700 Subject: [PATCH 09/26] Close GZIPOutputStream --- .../amazon/awssdk/core/internal/compression/GzipCompressor.java | 1 + 1 file changed, 1 insertion(+) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/compression/GzipCompressor.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/compression/GzipCompressor.java index 6419d7dd4e14..3a05bb670aef 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/compression/GzipCompressor.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/compression/GzipCompressor.java @@ -45,6 +45,7 @@ public SdkBytes compress(SdkBytes content) { ByteArrayOutputStream compressedOutputStream = new ByteArrayOutputStream(); gzipOutputStream = new GZIPOutputStream(compressedOutputStream); gzipOutputStream.write(content.asByteArray()); + gzipOutputStream.close(); return SdkBytes.fromByteArray(compressedOutputStream.toByteArray()); } catch (IOException e) { throw new UncheckedIOException(e); From 5b37111109cefaed581998ce7fbeac122c5942b5 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Thu, 29 Jun 2023 08:47:02 -0700 Subject: [PATCH 10/26] Add compress stage to async http client --- .../amazon/awssdk/core/internal/http/AmazonAsyncHttpClient.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/AmazonAsyncHttpClient.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/AmazonAsyncHttpClient.java index 7d0ebce693c5..9afaf87fdb1a 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/AmazonAsyncHttpClient.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/AmazonAsyncHttpClient.java @@ -37,6 +37,7 @@ import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage; import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage; import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncSigningStage; +import software.amazon.awssdk.core.internal.http.pipeline.stages.CompressRequestStage; import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage; import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeRequestImmutableStage; import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeRequestMutableStage; @@ -169,6 +170,7 @@ public CompletableFuture execute( .then(ApplyUserAgentStage::new) .then(MergeCustomHeadersStage::new) .then(MergeCustomQueryParamsStage::new) + .then(CompressRequestStage::new) .then(MakeRequestImmutableStage::new) .then(RequestPipelineBuilder .first(AsyncSigningStage::new) From cd241a60c7fc126fa1bf6f748316427d4d80d947 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Thu, 29 Jun 2023 08:47:25 -0700 Subject: [PATCH 11/26] Add compressed PutMetricData integ test --- .../cloudwatch/CloudWatchIntegrationTest.java | 86 ++++++++++++++++++- 1 file changed, 85 insertions(+), 1 deletion(-) diff --git a/services/cloudwatch/src/it/java/software/amazon/awssdk/services/cloudwatch/CloudWatchIntegrationTest.java b/services/cloudwatch/src/it/java/software/amazon/awssdk/services/cloudwatch/CloudWatchIntegrationTest.java index 01722140044f..e530b0fcae3e 100644 --- a/services/cloudwatch/src/it/java/software/amazon/awssdk/services/cloudwatch/CloudWatchIntegrationTest.java +++ b/services/cloudwatch/src/it/java/software/amazon/awssdk/services/cloudwatch/CloudWatchIntegrationTest.java @@ -39,8 +39,12 @@ import org.junit.BeforeClass; import org.junit.Test; import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider; +import software.amazon.awssdk.core.RequestCompressionConfiguration; import software.amazon.awssdk.core.SdkGlobalTime; import software.amazon.awssdk.core.exception.SdkServiceException; +import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute; +import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute; +import software.amazon.awssdk.core.interceptor.trait.RequestCompression; import software.amazon.awssdk.regions.Region; import software.amazon.awssdk.services.cloudwatch.model.Datapoint; import software.amazon.awssdk.services.cloudwatch.model.DeleteAlarmsRequest; @@ -108,7 +112,6 @@ public static void cleanupAlarms() { /** * Tests putting metrics and then getting them back. */ - @Test public void put_get_metricdata_list_metric_returns_success() throws InterruptedException { @@ -164,6 +167,87 @@ public void put_get_metricdata_list_metric_returns_success() throws assertTrue(seenDimensions); } + /** + * Tests putting metrics with request compression and then getting them back. + * TODO: We can remove this test once CloudWatch adds "RequestCompression" trait to PutMetricData + */ + @Test + public void put_get_metricdata_list_metric_withRequestCompression_returns_success() { + + RequestCompression requestCompressionTrait = RequestCompression.builder() + .encodings("gzip") + .isStreaming(false) + .build(); + RequestCompressionConfiguration compressionConfiguration = + RequestCompressionConfiguration.builder() + // uncompressed payload is 404 bytes + .minimumCompressionThresholdInBytes(100) + .build(); + + CloudWatchClient requestCompressionClient = + CloudWatchClient.builder() + .credentialsProvider(getCredentialsProvider()) + .region(Region.US_WEST_2) + .overrideConfiguration(c -> c.putExecutionAttribute(SdkInternalExecutionAttribute.REQUEST_COMPRESSION, + requestCompressionTrait) + .putExecutionAttribute( + SdkExecutionAttribute.REQUEST_COMPRESSION_CONFIGURATION, + compressionConfiguration)) + .build(); + + String measureName = this.getClass().getName() + System.currentTimeMillis(); + + MetricDatum datum = MetricDatum.builder().dimensions( + Dimension.builder().name("InstanceType").value("m1.small").build()) + .metricName(measureName).timestamp(Instant.now()) + .unit("Count").value(42.0).build(); + + requestCompressionClient.putMetricData(PutMetricDataRequest.builder() + .namespace("AWS.EC2").metricData(datum).build()); + + GetMetricStatisticsResponse result = + Waiter.run(() -> requestCompressionClient + .getMetricStatistics(r -> r.startTime(Instant.now().minus(Duration.ofDays(7))) + .namespace("AWS.EC2") + .period(60 * 60) + .dimensions(Dimension.builder().name("InstanceType") + .value("m1.small").build()) + .metricName(measureName) + .statisticsWithStrings("Average", "Maximum", "Minimum", "Sum") + .endTime(Instant.now()))) + .until(r -> r.datapoints().size() == 1) + .orFailAfter(Duration.ofMinutes(2)); + + assertNotNull(result.label()); + assertEquals(measureName, result.label()); + + assertEquals(1, result.datapoints().size()); + for (Datapoint datapoint : result.datapoints()) { + assertEquals(datum.value(), datapoint.average()); + assertEquals(datum.value(), datapoint.maximum()); + assertEquals(datum.value(), datapoint.minimum()); + assertEquals(datum.value(), datapoint.sum()); + assertNotNull(datapoint.timestamp()); + assertEquals(datum.unit(), datapoint.unit()); + } + + ListMetricsResponse listResult = requestCompressionClient.listMetrics(ListMetricsRequest.builder().build()); + + boolean seenDimensions = false; + assertTrue(listResult.metrics().size() > 0); + for (Metric metric : listResult.metrics()) { + assertNotNull(metric.metricName()); + assertNotNull(metric.namespace()); + + for (Dimension dimension : metric.dimensions()) { + seenDimensions = true; + assertNotNull(dimension.name()); + assertNotNull(dimension.value()); + } + } + assertTrue(seenDimensions); + } + /** * Tests setting the state for an alarm and reading its history. */ From 5268c0b7d09dd8122a59b5c06b56a158491741a8 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Tue, 4 Jul 2023 22:10:03 -0700 Subject: [PATCH 12/26] Refactoring --- .../pipeline/stages/CompressRequestStage.java | 46 +++------------- .../io/AwsCompressionInputStream.java | 2 +- .../CompressionContentStreamProvider.java | 55 +++++++++++++++++++ 3 files changed, 63 insertions(+), 40 deletions(-) create mode 100644 core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/sync/CompressionContentStreamProvider.java diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java index 932536989849..90a077e29cd2 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java @@ -23,6 +23,7 @@ import java.util.Optional; import java.util.function.Supplier; import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.SdkSystemSetting; import software.amazon.awssdk.core.compression.CompressionType; import software.amazon.awssdk.core.compression.Compressor; @@ -32,7 +33,7 @@ import software.amazon.awssdk.core.interceptor.SdkInternalExecutionAttribute; import software.amazon.awssdk.core.internal.http.RequestExecutionContext; import software.amazon.awssdk.core.internal.http.pipeline.MutableRequestToRequestPipeline; -import software.amazon.awssdk.core.internal.io.AwsCompressionInputStream; +import software.amazon.awssdk.core.internal.sync.CompressionContentStreamProvider; import software.amazon.awssdk.http.ContentStreamProvider; import software.amazon.awssdk.http.SdkHttpFullRequest; import software.amazon.awssdk.profiles.ProfileFile; @@ -67,7 +68,7 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ if (!isStreaming(context)) { return updateContentLengthHeader(input); } else { - return removeContentLengthHeader(input); + return input.removeHeader("Content-Length"); } } @@ -79,8 +80,8 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ // async streaming }*/ updateContentEncodingHeader(input, compressor); - removeContentLengthHeader(input); - return input.putHeader("Transfer-Encoding", "chunked"); + input.removeHeader("Content-Length"); + return input; } private static boolean shouldCompress(SdkHttpFullRequest.Builder input, RequestExecutionContext context) { @@ -131,10 +132,6 @@ private static SdkHttpFullRequest.Builder updateContentLengthHeader(SdkHttpFullR } } - private static SdkHttpFullRequest.Builder removeContentLengthHeader(SdkHttpFullRequest.Builder input) { - return input.removeHeader("Content-Length"); - } - private boolean isTransferEncodingChunked(SdkHttpFullRequest.Builder input) { return input.firstMatchingHeader("Transfer-Encoding") .map(headerValue -> headerValue.equals("chunked")) @@ -192,9 +189,8 @@ private static boolean resolveRequestCompressionEnabled(RequestExecutionContext private static boolean isRequestSizeWithinThreshold(SdkHttpFullRequest.Builder input, RequestExecutionContext context) { int minimumCompressionThreshold = resolveMinCompressionSize(context); validateMinCompressionSizeInput(minimumCompressionThreshold); - - long contentLength = Long.parseLong(input.firstMatchingHeader("Content-Length").orElse("0")); - return contentLength >= minimumCompressionThreshold; + int requestSize = SdkBytes.fromInputStream(input.contentStreamProvider().newStream()).asByteArray().length; + return requestSize >= minimumCompressionThreshold; } private static int resolveMinCompressionSize(RequestExecutionContext context) { @@ -239,32 +235,4 @@ private static void validateMinCompressionSizeInput(int minCompressionSize) { + "10485760.", new IllegalArgumentException()); } } - - static final class CompressionContentStreamProvider implements ContentStreamProvider { - private final ContentStreamProvider underlyingInputStreamProvider; - private InputStream currentStream; - private final Compressor compressor; - - CompressionContentStreamProvider(ContentStreamProvider underlyingInputStreamProvider, Compressor compressor) { - this.underlyingInputStreamProvider = underlyingInputStreamProvider; - this.compressor = compressor; - } - - @Override - public InputStream newStream() { - closeCurrentStream(); - currentStream = AwsCompressionInputStream.builder() - .inputStream(underlyingInputStreamProvider.newStream()) - .compressor(compressor) - .build(); - return currentStream; - } - - private void closeCurrentStream() { - if (currentStream != null) { - IoUtils.closeQuietly(currentStream, null); - currentStream = null; - } - } - } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java index 0e5794d7c372..b68e864affb2 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java @@ -29,7 +29,7 @@ */ @SdkInternalApi public class AwsCompressionInputStream extends AwsChunkedInputStream { - private Compressor compressor; + private final Compressor compressor; private AwsCompressionInputStream(InputStream in, Compressor compressor) { this.compressor = compressor; diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/sync/CompressionContentStreamProvider.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/sync/CompressionContentStreamProvider.java new file mode 100644 index 000000000000..014812cb8033 --- /dev/null +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/sync/CompressionContentStreamProvider.java @@ -0,0 +1,55 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.sync; + +import java.io.InputStream; +import software.amazon.awssdk.annotations.SdkInternalApi; +import software.amazon.awssdk.core.compression.Compressor; +import software.amazon.awssdk.core.internal.io.AwsCompressionInputStream; +import software.amazon.awssdk.http.ContentStreamProvider; +import software.amazon.awssdk.utils.IoUtils; + +/** + * {@link ContentStreamProvider} implementation for compression. + */ +@SdkInternalApi +public class CompressionContentStreamProvider implements ContentStreamProvider { + private final ContentStreamProvider underlyingInputStreamProvider; + private InputStream currentStream; + private final Compressor compressor; + + public CompressionContentStreamProvider(ContentStreamProvider underlyingInputStreamProvider, Compressor compressor) { + this.underlyingInputStreamProvider = underlyingInputStreamProvider; + this.compressor = compressor; + } + + @Override + public InputStream newStream() { + closeCurrentStream(); + currentStream = AwsCompressionInputStream.builder() + .inputStream(underlyingInputStreamProvider.newStream()) + .compressor(compressor) + .build(); + return currentStream; + } + + private void closeCurrentStream() { + if (currentStream != null) { + IoUtils.closeQuietly(currentStream, null); + currentStream = null; + } + } +} From d8aa9a5531210e3fefaf24aa81b6114e742da519 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Tue, 4 Jul 2023 22:10:57 -0700 Subject: [PATCH 13/26] Add tests --- .../compression/GzipCompressorTest.java | 57 +++++++ .../customresponsemetadata/service-2.json | 46 ++++++ .../services/RequestCompressionTest.java | 150 ++++++++++++++++++ 3 files changed, 253 insertions(+) create mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/compression/GzipCompressorTest.java create mode 100644 test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/compression/GzipCompressorTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/compression/GzipCompressorTest.java new file mode 100644 index 000000000000..b9b4d7d77de6 --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/compression/GzipCompressorTest.java @@ -0,0 +1,57 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.internal.compression; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.lessThan; +import static org.hamcrest.core.Is.is; + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.zip.GZIPInputStream; +import org.junit.Test; +import software.amazon.awssdk.core.compression.Compressor; + +public class GzipCompressorTest { + private static final Compressor gzipCompressor = new GzipCompressor(); + private static final String COMPRESSABLE_STRING = + "RequestCompressionTest-RequestCompressionTest-RequestCompressionTest-RequestCompressionTest-RequestCompressionTest"; + + @Test + public void compressedData_decompressesCorrectly() throws IOException { + byte[] originalData = COMPRESSABLE_STRING.getBytes(StandardCharsets.UTF_8); + byte[] compressedData = gzipCompressor.compress(originalData); + + int uncompressedSize = originalData.length; + int compressedSize = compressedData.length; + assertThat(compressedSize, lessThan(uncompressedSize)); + + ByteArrayInputStream bais = new ByteArrayInputStream(compressedData); + GZIPInputStream gzipInputStream = new GZIPInputStream(bais); + ByteArrayOutputStream baos = new ByteArrayOutputStream(); + byte[] buffer = new byte[1024]; + int bytesRead; + while ((bytesRead = gzipInputStream.read(buffer)) != -1) { + baos.write(buffer, 0, bytesRead); + } + gzipInputStream.close(); + byte[] decompressedData = baos.toByteArray(); + + assertThat(decompressedData, is(originalData)); + } +} diff --git a/test/codegen-generated-classes-test/src/main/resources/codegen-resources/customresponsemetadata/service-2.json b/test/codegen-generated-classes-test/src/main/resources/codegen-resources/customresponsemetadata/service-2.json index 6b1cb368d486..1293ad21bb61 100644 --- a/test/codegen-generated-classes-test/src/main/resources/codegen-resources/customresponsemetadata/service-2.json +++ b/test/codegen-generated-classes-test/src/main/resources/codegen-resources/customresponsemetadata/service-2.json @@ -277,6 +277,30 @@ "requestAlgorithmMember": "ChecksumAlgorithm" } }, + "PutOperationWithRequestCompression":{ + "name":"PutOperationWithRequestCompression", + "http":{ + "method":"PUT", + "requestUri":"/" + }, + "input":{"shape":"RequestCompressionStructure"}, + "output":{"shape":"RequestCompressionStructure"}, + "requestCompression": { + "encodings": ["gzip"] + } + }, + "PutOperationWithStreamingRequestCompression":{ + "name":"PutOperationWithStreamingRequestCompression", + "http":{ + "method":"PUT", + "requestUri":"/" + }, + "input":{"shape":"RequestCompressionStructureWithStreaming"}, + "output":{"shape":"RequestCompressionStructureWithStreaming"}, + "requestCompression": { + "encodings": ["gzip"] + } + }, "GetOperationWithChecksum":{ "name":"GetOperationWithChecksum", "http":{ @@ -1007,6 +1031,28 @@ } }, "payload":"NestedQueryParameterOperation" + }, + "RequestCompressionStructure":{ + "type":"structure", + "members":{ + "Body":{ + "shape":"Body", + "documentation":"

Object data.

", + "streaming":false + } + }, + "payload":"Body" + }, + "RequestCompressionStructureWithStreaming":{ + "type":"structure", + "members":{ + "Body":{ + "shape":"Body", + "documentation":"

Object data.

", + "streaming":true + } + }, + "payload":"Body" } } } diff --git a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java new file mode 100644 index 000000000000..1a8e5b8633bf --- /dev/null +++ b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java @@ -0,0 +1,150 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.services; + +import static org.assertj.core.api.Assertions.assertThat; + +import java.io.InputStream; +import java.nio.charset.StandardCharsets; +import java.time.Duration; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.AfterEach; +import software.amazon.awssdk.auth.credentials.AnonymousCredentialsProvider; +import software.amazon.awssdk.core.SdkBytes; +import software.amazon.awssdk.core.compression.Compressor; +import software.amazon.awssdk.core.internal.compression.GzipCompressor; +import software.amazon.awssdk.core.sync.RequestBody; +import software.amazon.awssdk.core.sync.ResponseTransformer; +import software.amazon.awssdk.http.HttpExecuteResponse; +import software.amazon.awssdk.http.SdkHttpFullRequest; +import software.amazon.awssdk.http.SdkHttpResponse; +import software.amazon.awssdk.regions.Region; +import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonAsyncClient; +import software.amazon.awssdk.services.protocolrestjson.ProtocolRestJsonClient; +import software.amazon.awssdk.services.protocolrestjson.model.PutOperationWithRequestCompressionRequest; +import software.amazon.awssdk.services.protocolrestjson.model.PutOperationWithStreamingRequestCompressionRequest; +import software.amazon.awssdk.testutils.service.http.MockAsyncHttpClient; +import software.amazon.awssdk.testutils.service.http.MockSyncHttpClient; + +public class RequestCompressionTest { + private static final String COMPRESSABLE_BODY = + "RequestCompressionTest-RequestCompressionTest-RequestCompressionTest-RequestCompressionTest-RequestCompressionTest"; + private static final String CRLF = "\r\n"; + MockSyncHttpClient mockHttpClient; + MockAsyncHttpClient mockAsyncHttpClient; + ProtocolRestJsonClient syncClient; + ProtocolRestJsonAsyncClient asyncClient; + Compressor compressor; + + @BeforeEach + public void setUp() { + mockHttpClient = new MockSyncHttpClient(); + mockAsyncHttpClient = new MockAsyncHttpClient(); + syncClient = ProtocolRestJsonClient.builder() + .credentialsProvider(AnonymousCredentialsProvider.create()) + .region(Region.US_EAST_1) + .httpClient(mockHttpClient) + .build(); + asyncClient = ProtocolRestJsonAsyncClient.builder() + .credentialsProvider(AnonymousCredentialsProvider.create()) + .region(Region.US_EAST_1) + .httpClient(mockAsyncHttpClient) + .build(); + compressor = new GzipCompressor(); + } + + @AfterEach + public void reset() { + mockHttpClient.reset(); + mockAsyncHttpClient.reset(); + } + + @Test + public void sync_nonStreaming_compression_compressesCorrectly() throws InterruptedException { + mockHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); + + byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); + PutOperationWithRequestCompressionRequest request = + PutOperationWithRequestCompressionRequest.builder() + .body(SdkBytes.fromByteArray(uncompressedData)) + .overrideConfiguration(o -> o.requestCompressionConfiguration( + c -> c.minimumCompressionThresholdInBytes(1))) + .build(); + syncClient.putOperationWithRequestCompression(request); + + SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockHttpClient.getLastRequest(); + InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); + byte[] loggedBody = SdkBytes.fromInputStream(loggedStream).asByteArray(); + int loggedSize = Integer.valueOf(loggedRequest.firstMatchingHeader("Content-Length").get()); + byte[] compressedData = compressor.compress(uncompressedData); + + assertThat(loggedBody).isEqualTo(compressedData); + assertThat(loggedSize).isEqualTo(compressedData.length); + assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); + } + + @Test + public void async_nonStreaming_compression_compressesCorrectly() throws InterruptedException { + mockAsyncHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); + + byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); + PutOperationWithRequestCompressionRequest request = + PutOperationWithRequestCompressionRequest.builder() + .body(SdkBytes.fromByteArray(uncompressedData)) + .overrideConfiguration(o -> o.requestCompressionConfiguration( + c -> c.minimumCompressionThresholdInBytes(1))) + .build(); + + asyncClient.putOperationWithRequestCompression(request); + + SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockAsyncHttpClient.getLastRequest(); + InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); + byte[] loggedBody = SdkBytes.fromInputStream(loggedStream).asByteArray(); + int loggedSize = Integer.valueOf(loggedRequest.firstMatchingHeader("Content-Length").get()); + byte[] compressedData = compressor.compress(uncompressedData); + + assertThat(loggedBody).isEqualTo(compressedData); + assertThat(loggedSize).isEqualTo(compressedData.length); + assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); + } + + @Test + public void sync_streaming_compression_compressesCorrectly() throws InterruptedException { + mockHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); + byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); + PutOperationWithStreamingRequestCompressionRequest request = + PutOperationWithStreamingRequestCompressionRequest.builder().build(); + syncClient.putOperationWithStreamingRequestCompression(request, RequestBody.fromBytes(uncompressedData), + ResponseTransformer.toBytes()); + + SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockHttpClient.getLastRequest(); + InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); + byte[] loggedBody = SdkBytes.fromInputStream(loggedStream).asByteArray(); + byte[] compressedData = compressor.compress(uncompressedData); + String chunkBody = Integer.toHexString(compressedData.length) + CRLF + new String(compressedData) + CRLF + "0" + CRLF; + + assertThat(new String(loggedBody)).isEqualTo(chunkBody); + assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); + assertThat(loggedRequest.matchingHeaders("Content-Length")).isEmpty(); + } + + private HttpExecuteResponse mockResponse() { + return HttpExecuteResponse.builder() + .response(SdkHttpResponse.builder().statusCode(200).build()) + .build(); + } +} From 37d962e68399596b687aaaa269983f2485b444b0 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Fri, 7 Jul 2023 14:23:21 -0700 Subject: [PATCH 14/26] Add equals and hashCode --- .../core/compression/CompressionType.java | 22 +++++++++++++++++++ 1 file changed, 22 insertions(+) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java index 3fee7f5c14dc..a0fa8645285c 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java @@ -17,6 +17,7 @@ import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import software.amazon.awssdk.annotations.SdkInternalApi; @@ -81,6 +82,27 @@ public String toString() { return id; } + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + + CompressionType that = (CompressionType) o; + return Objects.equals(id, that.id) + && Objects.equals(compressorMap, that.compressorMap); + } + + @Override + public int hashCode() { + int result = id != null ? id.hashCode() : 0; + result = 31 * result + (compressorMap != null ? compressorMap.hashCode() : 0); + return result; + } + private static class CompressionTypeCache { private static final ConcurrentHashMap VALUES = new ConcurrentHashMap<>(); From aab6f62243c73b8c6011096be7eb7aaa87e27936 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Fri, 7 Jul 2023 14:23:35 -0700 Subject: [PATCH 15/26] Add TODO --- .../internal/http/pipeline/stages/CompressRequestStage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java index 90a077e29cd2..d0f1d0d84ee2 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java @@ -77,7 +77,7 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ input.contentStreamProvider(new CompressionContentStreamProvider(input.contentStreamProvider(), compressor)); } /*else { - // async streaming + // TODO: async streaming }*/ updateContentEncodingHeader(input, compressor); input.removeHeader("Content-Length"); From 4b52a883a83e64050f488edce5ce056f8b83e38c Mon Sep 17 00:00:00 2001 From: hdavidh Date: Fri, 7 Jul 2023 14:23:53 -0700 Subject: [PATCH 16/26] Add tests with retry --- .../services/RequestCompressionTest.java | 85 ++++++++++++++++++- 1 file changed, 82 insertions(+), 3 deletions(-) diff --git a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java index 1a8e5b8633bf..e346fe453689 100644 --- a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java +++ b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java @@ -74,7 +74,7 @@ public void reset() { } @Test - public void sync_nonStreaming_compression_compressesCorrectly() throws InterruptedException { + public void sync_nonStreaming_compression_compressesCorrectly() { mockHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); @@ -98,7 +98,7 @@ public void sync_nonStreaming_compression_compressesCorrectly() throws Interrupt } @Test - public void async_nonStreaming_compression_compressesCorrectly() throws InterruptedException { + public void async_nonStreaming_compression_compressesCorrectly() { mockAsyncHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); @@ -123,7 +123,7 @@ public void async_nonStreaming_compression_compressesCorrectly() throws Interrup } @Test - public void sync_streaming_compression_compressesCorrectly() throws InterruptedException { + public void sync_streaming_compression_compressesCorrectly() { mockHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); PutOperationWithStreamingRequestCompressionRequest request = @@ -142,9 +142,88 @@ public void sync_streaming_compression_compressesCorrectly() throws InterruptedE assertThat(loggedRequest.matchingHeaders("Content-Length")).isEmpty(); } + @Test + public void sync_nonStreaming_compression_withRetry_compressesCorrectly() { + mockHttpClient.stubNextResponse(mockErrorResponse(), Duration.ofMillis(500)); + mockHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); + + byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); + PutOperationWithRequestCompressionRequest request = + PutOperationWithRequestCompressionRequest.builder() + .body(SdkBytes.fromByteArray(uncompressedData)) + .overrideConfiguration(o -> o.requestCompressionConfiguration( + c -> c.minimumCompressionThresholdInBytes(1))) + .build(); + syncClient.putOperationWithRequestCompression(request); + + SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockHttpClient.getLastRequest(); + InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); + byte[] loggedBody = SdkBytes.fromInputStream(loggedStream).asByteArray(); + int loggedSize = Integer.valueOf(loggedRequest.firstMatchingHeader("Content-Length").get()); + byte[] compressedData = compressor.compress(uncompressedData); + + assertThat(loggedBody).isEqualTo(compressedData); + assertThat(loggedSize).isEqualTo(compressedData.length); + assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); + } + + @Test + public void async_nonStreaming_compression_withRetry_compressesCorrectly() { + mockAsyncHttpClient.stubNextResponse(mockErrorResponse(), Duration.ofMillis(500)); + mockAsyncHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); + + byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); + PutOperationWithRequestCompressionRequest request = + PutOperationWithRequestCompressionRequest.builder() + .body(SdkBytes.fromByteArray(uncompressedData)) + .overrideConfiguration(o -> o.requestCompressionConfiguration( + c -> c.minimumCompressionThresholdInBytes(1))) + .build(); + + asyncClient.putOperationWithRequestCompression(request); + + SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockAsyncHttpClient.getLastRequest(); + InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); + byte[] loggedBody = SdkBytes.fromInputStream(loggedStream).asByteArray(); + int loggedSize = Integer.valueOf(loggedRequest.firstMatchingHeader("Content-Length").get()); + byte[] compressedData = compressor.compress(uncompressedData); + + assertThat(loggedBody).isEqualTo(compressedData); + assertThat(loggedSize).isEqualTo(compressedData.length); + assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); + } + + @Test + public void sync_streaming_compression_withRetry_compressesCorrectly() { + mockHttpClient.stubNextResponse(mockErrorResponse(), Duration.ofMillis(500)); + mockHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); + + byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); + PutOperationWithStreamingRequestCompressionRequest request = + PutOperationWithStreamingRequestCompressionRequest.builder().build(); + syncClient.putOperationWithStreamingRequestCompression(request, RequestBody.fromBytes(uncompressedData), + ResponseTransformer.toBytes()); + + SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockHttpClient.getLastRequest(); + InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); + byte[] loggedBody = SdkBytes.fromInputStream(loggedStream).asByteArray(); + byte[] compressedData = compressor.compress(uncompressedData); + String chunkBody = Integer.toHexString(compressedData.length) + CRLF + new String(compressedData) + CRLF + "0" + CRLF; + + assertThat(new String(loggedBody)).isEqualTo(chunkBody); + assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); + assertThat(loggedRequest.matchingHeaders("Content-Length")).isEmpty(); + } + private HttpExecuteResponse mockResponse() { return HttpExecuteResponse.builder() .response(SdkHttpResponse.builder().statusCode(200).build()) .build(); } + + private HttpExecuteResponse mockErrorResponse() { + return HttpExecuteResponse.builder() + .response(SdkHttpResponse.builder().statusCode(500).build()) + .build(); + } } From d9e28d91d86eb14f7568e7fe390f54282e94a1dc Mon Sep 17 00:00:00 2001 From: hdavidh Date: Mon, 10 Jul 2023 17:23:30 -0700 Subject: [PATCH 17/26] Update content length retrieval --- .../http/pipeline/stages/CompressRequestStage.java | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java index d0f1d0d84ee2..8b94bbdc4140 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java @@ -189,8 +189,15 @@ private static boolean resolveRequestCompressionEnabled(RequestExecutionContext private static boolean isRequestSizeWithinThreshold(SdkHttpFullRequest.Builder input, RequestExecutionContext context) { int minimumCompressionThreshold = resolveMinCompressionSize(context); validateMinCompressionSizeInput(minimumCompressionThreshold); - int requestSize = SdkBytes.fromInputStream(input.contentStreamProvider().newStream()).asByteArray().length; - return requestSize >= minimumCompressionThreshold; + return getRequestSize(input) >= minimumCompressionThreshold; + } + + private static int getRequestSize(SdkHttpFullRequest.Builder input) { + Optional header = input.firstMatchingHeader("Content-Length"); + if (header.isPresent()) { + return Integer.valueOf(header.get()); + } + return SdkBytes.fromInputStream(input.contentStreamProvider().newStream()).asByteArray().length; } private static int resolveMinCompressionSize(RequestExecutionContext context) { From 86ff75a56f63152ec26e5f15b7d9011b53fcc9ad Mon Sep 17 00:00:00 2001 From: hdavidh Date: Mon, 10 Jul 2023 21:51:40 -0700 Subject: [PATCH 18/26] CompressionType test --- .../core/compression/CompressionType.java | 2 +- .../core/compression/CompressionTypeTest.java | 47 +++++++++++++++++++ 2 files changed, 48 insertions(+), 1 deletion(-) create mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/compression/CompressionTypeTest.java diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java index a0fa8645285c..1c40852022d1 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java @@ -37,7 +37,7 @@ public final class CompressionType { put("gzip", new GzipCompressor()); }}; - private String id; + private final String id; private CompressionType(String id) { this.id = id; diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/compression/CompressionTypeTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/compression/CompressionTypeTest.java new file mode 100644 index 000000000000..edaef7fede92 --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/compression/CompressionTypeTest.java @@ -0,0 +1,47 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core.compression; + +import static org.assertj.core.api.Assertions.assertThat; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.jupiter.api.Test; + +public class CompressionTypeTest { + + @Test + public void equalsHashcode() { + EqualsVerifier.forClass(CompressionType.class) + .withNonnullFields("id") + .verify(); + } + + @Test + public void compressionType_gzip() { + CompressionType gzip = CompressionType.GZIP; + CompressionType gzipFromString = CompressionType.of("gzip"); + assertThat(gzip).isSameAs(gzipFromString); + assertThat(gzip).isEqualTo(gzipFromString); + } + + @Test + public void compressionType_usesSameInstance_when_sameCompressionTypeOfSameValue() { + CompressionType brotliFromString = CompressionType.of("brotli"); + CompressionType brotliFromStringDuplicate = CompressionType.of("brotli"); + assertThat(brotliFromString).isSameAs(brotliFromStringDuplicate); + assertThat(brotliFromString).isEqualTo(brotliFromStringDuplicate); + } +} From 24d69db1fa526504260fee68f9970251b9ed195e Mon Sep 17 00:00:00 2001 From: hdavidh Date: Mon, 10 Jul 2023 21:59:59 -0700 Subject: [PATCH 19/26] RequestCompressionConfiguration test --- .../RequestCompressionConfigurationTest.java | 43 +++++++++++++++++++ 1 file changed, 43 insertions(+) create mode 100644 core/sdk-core/src/test/java/software/amazon/awssdk/core/RequestCompressionConfigurationTest.java diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/RequestCompressionConfigurationTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/RequestCompressionConfigurationTest.java new file mode 100644 index 000000000000..2740e9c4b14d --- /dev/null +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/RequestCompressionConfigurationTest.java @@ -0,0 +1,43 @@ +/* + * Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"). + * You may not use this file except in compliance with the License. + * A copy of the License is located at + * + * http://aws.amazon.com/apache2.0 + * + * or in the "license" file accompanying this file. This file is distributed + * on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either + * express or implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package software.amazon.awssdk.core; + +import static org.assertj.core.api.AssertionsForClassTypes.assertThat; + +import nl.jqno.equalsverifier.EqualsVerifier; +import org.junit.jupiter.api.Test; + +public class RequestCompressionConfigurationTest { + + @Test + public void equalsHashcode() { + EqualsVerifier.forClass(RequestCompressionConfiguration.class) + .withNonnullFields("requestCompressionEnabled", "minimumCompressionThresholdInBytes") + .verify(); + } + + @Test + public void toBuilder() { + RequestCompressionConfiguration configuration = + RequestCompressionConfiguration.builder() + .requestCompressionEnabled(true) + .minimumCompressionThresholdInBytes(99999) + .build(); + + RequestCompressionConfiguration another = configuration.toBuilder().build(); + assertThat(configuration).isEqualTo(another); + } +} From 9dadea3555e2c6a6aa3824aeebb310723562815e Mon Sep 17 00:00:00 2001 From: hdavidh Date: Tue, 11 Jul 2023 13:58:50 -0700 Subject: [PATCH 20/26] Fix Spotbugs error --- .../internal/http/pipeline/stages/CompressRequestStage.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java index 8b94bbdc4140..5d0b616fab44 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java @@ -194,10 +194,8 @@ private static boolean isRequestSizeWithinThreshold(SdkHttpFullRequest.Builder i private static int getRequestSize(SdkHttpFullRequest.Builder input) { Optional header = input.firstMatchingHeader("Content-Length"); - if (header.isPresent()) { - return Integer.valueOf(header.get()); - } - return SdkBytes.fromInputStream(input.contentStreamProvider().newStream()).asByteArray().length; + return header.map(s -> (int) Long.parseLong(s)) + .orElseGet(() -> SdkBytes.fromInputStream(input.contentStreamProvider().newStream()).asByteArray().length); } private static int resolveMinCompressionSize(RequestExecutionContext context) { From 95e0383fac17582c42d3831292d5fd0f16d88a8f Mon Sep 17 00:00:00 2001 From: hdavidh Date: Tue, 11 Jul 2023 20:54:27 -0700 Subject: [PATCH 21/26] Rename CompressionType to CompressorType --- ...mpressionType.java => CompressorType.java} | 30 +++++++++---------- .../pipeline/stages/CompressRequestStage.java | 6 ++-- ...nTypeTest.java => CompressorTypeTest.java} | 16 +++++----- 3 files changed, 26 insertions(+), 26 deletions(-) rename core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/{CompressionType.java => CompressorType.java} (77%) rename core/sdk-core/src/test/java/software/amazon/awssdk/core/compression/{CompressionTypeTest.java => CompressorTypeTest.java} (70%) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressorType.java similarity index 77% rename from core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java rename to core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressorType.java index 1c40852022d1..e8adb4bd30a9 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressionType.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/compression/CompressorType.java @@ -29,9 +29,9 @@ * {@link Compressor} implementation. */ @SdkInternalApi -public final class CompressionType { +public final class CompressorType { - public static final CompressionType GZIP = CompressionType.of("gzip"); + public static final CompressorType GZIP = CompressorType.of("gzip"); private static Map compressorMap = new HashMap() {{ put("gzip", new GzipCompressor()); @@ -39,22 +39,22 @@ public final class CompressionType { private final String id; - private CompressionType(String id) { + private CompressorType(String id) { this.id = id; } /** - * Creates a new {@link CompressionType} of the given value. + * Creates a new {@link CompressorType} of the given value. */ - public static CompressionType of(String value) { + public static CompressorType of(String value) { Validate.paramNotBlank(value, "compressionType"); - return CompressionTypeCache.put(value); + return CompressorTypeCache.put(value); } /** * Returns the {@link Set} of {@link String}s of compression types supported by the SDK. */ - public static Set compressionTypes() { + public static Set compressorTypes() { return compressorMap.keySet(); } @@ -62,11 +62,11 @@ public static Set compressionTypes() { * Whether or not the compression type is supported by the SDK. */ public static boolean isSupported(String compressionType) { - return compressionTypes().contains(compressionType); + return compressorTypes().contains(compressionType); } /** - * Maps the {@link CompressionType} to its corresponding {@link Compressor}. + * Maps the {@link CompressorType} to its corresponding {@link Compressor}. */ public Compressor newCompressor() { Compressor compressor = compressorMap.getOrDefault(this.id, null); @@ -91,7 +91,7 @@ public boolean equals(Object o) { return false; } - CompressionType that = (CompressionType) o; + CompressorType that = (CompressorType) o; return Objects.equals(id, that.id) && Objects.equals(compressorMap, that.compressorMap); } @@ -103,14 +103,14 @@ public int hashCode() { return result; } - private static class CompressionTypeCache { - private static final ConcurrentHashMap VALUES = new ConcurrentHashMap<>(); + private static class CompressorTypeCache { + private static final ConcurrentHashMap VALUES = new ConcurrentHashMap<>(); - private CompressionTypeCache() { + private CompressorTypeCache() { } - private static CompressionType put(String value) { - return VALUES.computeIfAbsent(value, v -> new CompressionType(value)); + private static CompressorType put(String value) { + return VALUES.computeIfAbsent(value, v -> new CompressorType(value)); } } } diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java index 5d0b616fab44..4416c7244834 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java @@ -25,7 +25,7 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.SdkSystemSetting; -import software.amazon.awssdk.core.compression.CompressionType; +import software.amazon.awssdk.core.compression.CompressorType; import software.amazon.awssdk.core.compression.Compressor; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; @@ -144,8 +144,8 @@ private static Compressor resolveCompressionType(ExecutionAttributes executionAt for (String encoding: encodings) { encoding = encoding.toLowerCase(Locale.ROOT); - if (CompressionType.isSupported(encoding)) { - return CompressionType.of(encoding).newCompressor(); + if (CompressorType.isSupported(encoding)) { + return CompressorType.of(encoding).newCompressor(); } } return null; diff --git a/core/sdk-core/src/test/java/software/amazon/awssdk/core/compression/CompressionTypeTest.java b/core/sdk-core/src/test/java/software/amazon/awssdk/core/compression/CompressorTypeTest.java similarity index 70% rename from core/sdk-core/src/test/java/software/amazon/awssdk/core/compression/CompressionTypeTest.java rename to core/sdk-core/src/test/java/software/amazon/awssdk/core/compression/CompressorTypeTest.java index edaef7fede92..c0c0af4bdaa4 100644 --- a/core/sdk-core/src/test/java/software/amazon/awssdk/core/compression/CompressionTypeTest.java +++ b/core/sdk-core/src/test/java/software/amazon/awssdk/core/compression/CompressorTypeTest.java @@ -20,27 +20,27 @@ import nl.jqno.equalsverifier.EqualsVerifier; import org.junit.jupiter.api.Test; -public class CompressionTypeTest { +public class CompressorTypeTest { @Test public void equalsHashcode() { - EqualsVerifier.forClass(CompressionType.class) + EqualsVerifier.forClass(CompressorType.class) .withNonnullFields("id") .verify(); } @Test - public void compressionType_gzip() { - CompressionType gzip = CompressionType.GZIP; - CompressionType gzipFromString = CompressionType.of("gzip"); + public void compressorType_gzip() { + CompressorType gzip = CompressorType.GZIP; + CompressorType gzipFromString = CompressorType.of("gzip"); assertThat(gzip).isSameAs(gzipFromString); assertThat(gzip).isEqualTo(gzipFromString); } @Test - public void compressionType_usesSameInstance_when_sameCompressionTypeOfSameValue() { - CompressionType brotliFromString = CompressionType.of("brotli"); - CompressionType brotliFromStringDuplicate = CompressionType.of("brotli"); + public void compressorType_usesSameInstance_when_sameCompressorTypeOfSameValue() { + CompressorType brotliFromString = CompressorType.of("brotli"); + CompressorType brotliFromStringDuplicate = CompressorType.of("brotli"); assertThat(brotliFromString).isSameAs(brotliFromStringDuplicate); assertThat(brotliFromString).isEqualTo(brotliFromStringDuplicate); } From 4d1e924955acbd3434ad2e067822d03b5fb269ae Mon Sep 17 00:00:00 2001 From: hdavidh Date: Wed, 12 Jul 2023 09:56:52 -0700 Subject: [PATCH 22/26] Fix import ordering --- .../internal/http/pipeline/stages/CompressRequestStage.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java index 4416c7244834..95e982fd22fa 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java @@ -25,8 +25,8 @@ import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.SdkBytes; import software.amazon.awssdk.core.SdkSystemSetting; -import software.amazon.awssdk.core.compression.CompressorType; import software.amazon.awssdk.core.compression.Compressor; +import software.amazon.awssdk.core.compression.CompressorType; import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.core.interceptor.ExecutionAttributes; import software.amazon.awssdk.core.interceptor.SdkExecutionAttribute; From ebc0c355b25a6bd718e6c1d40e91c294c8a0631a Mon Sep 17 00:00:00 2001 From: hdavidh Date: Fri, 14 Jul 2023 16:58:00 -0700 Subject: [PATCH 23/26] Remove chunk headers and trailers --- .../io/AwsCompressionInputStream.java | 53 +++++-------------- 1 file changed, 14 insertions(+), 39 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java index b68e864affb2..d9e8251e3b78 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/io/AwsCompressionInputStream.java @@ -17,18 +17,16 @@ import java.io.IOException; import java.io.InputStream; -import java.nio.charset.StandardCharsets; import java.util.Arrays; import software.amazon.awssdk.annotations.SdkInternalApi; import software.amazon.awssdk.core.compression.Compressor; -import software.amazon.awssdk.core.exception.SdkClientException; import software.amazon.awssdk.utils.Validate; /** * A wrapper class of InputStream that implements compression in chunks. */ @SdkInternalApi -public class AwsCompressionInputStream extends AwsChunkedInputStream { +public final class AwsCompressionInputStream extends AwsChunkedInputStream { private final Compressor compressor; private AwsCompressionInputStream(InputStream in, Compressor compressor) { @@ -94,49 +92,26 @@ private boolean setUpNextChunk() throws IOException { } } if (chunkSizeInBytes == 0) { - byte[] finalChunk = createFinalChunk(FINAL_CHUNK); - currentChunkIterator = new ChunkContentIterator(finalChunk); return true; - } else { - if (chunkSizeInBytes < chunkData.length) { - chunkData = Arrays.copyOf(chunkData, chunkSizeInBytes); - } - // Compress the chunk - byte[] compressedChunkData = compressor.compress(chunkData); - byte[] chunkContent = createChunk(compressedChunkData); - currentChunkIterator = new ChunkContentIterator(chunkContent); - return false; } - } - protected byte[] createChunk(byte[] compressedChunkData) { - StringBuilder chunkHeader = new StringBuilder(); - chunkHeader.append(Integer.toHexString(compressedChunkData.length)); - chunkHeader.append(CRLF); - try { - byte[] header = chunkHeader.toString().getBytes(StandardCharsets.UTF_8); - byte[] trailer = CRLF.getBytes(StandardCharsets.UTF_8); - byte[] chunk = new byte[header.length + compressedChunkData.length + trailer.length]; - System.arraycopy(header, 0, chunk, 0, header.length); - System.arraycopy(compressedChunkData, 0, chunk, header.length, compressedChunkData.length); - System.arraycopy(trailer, 0, - chunk, header.length + compressedChunkData.length, - trailer.length); - return chunk; - } catch (Exception e) { - throw SdkClientException.builder() - .message("Unable to create chunked data. " + e.getMessage()) - .cause(e) - .build(); + if (chunkSizeInBytes < chunkData.length) { + chunkData = Arrays.copyOf(chunkData, chunkSizeInBytes); } + // Compress the chunk + byte[] compressedChunkData = createChunk(chunkData); + currentChunkIterator = new ChunkContentIterator(compressedChunkData); + return false; } + @Override + protected byte[] createChunk(byte[] chunkData) { + return compressor.compress(chunkData); + } + + @Override protected byte[] createFinalChunk(byte[] finalChunk) { - StringBuilder chunkHeader = new StringBuilder(); - // chunk-size - chunkHeader.append(Integer.toHexString(finalChunk.length)); - chunkHeader.append(CRLF); - return chunkHeader.toString().getBytes(StandardCharsets.UTF_8); + throw new UnsupportedOperationException(); } /** From 1d9bb6f88ff3516022f1ae701634ceb283e2d1d8 Mon Sep 17 00:00:00 2001 From: hdavidh Date: Fri, 14 Jul 2023 16:58:13 -0700 Subject: [PATCH 24/26] Update tests --- .../services/RequestCompressionTest.java | 70 ++++++++----------- 1 file changed, 31 insertions(+), 39 deletions(-) diff --git a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java index e346fe453689..e6552cabfa0f 100644 --- a/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java +++ b/test/codegen-generated-classes-test/src/test/java/software/amazon/awssdk/services/RequestCompressionTest.java @@ -18,7 +18,6 @@ import static org.assertj.core.api.Assertions.assertThat; import java.io.InputStream; -import java.nio.charset.StandardCharsets; import java.time.Duration; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -41,9 +40,10 @@ import software.amazon.awssdk.testutils.service.http.MockSyncHttpClient; public class RequestCompressionTest { - private static final String COMPRESSABLE_BODY = + private static final String UNCOMPRESSED_BODY = "RequestCompressionTest-RequestCompressionTest-RequestCompressionTest-RequestCompressionTest-RequestCompressionTest"; - private static final String CRLF = "\r\n"; + private String compressedBody; + private int compressedLen; MockSyncHttpClient mockHttpClient; MockAsyncHttpClient mockAsyncHttpClient; ProtocolRestJsonClient syncClient; @@ -65,6 +65,9 @@ public void setUp() { .httpClient(mockAsyncHttpClient) .build(); compressor = new GzipCompressor(); + byte[] compressedBodyBytes = compressor.compress(SdkBytes.fromUtf8String(UNCOMPRESSED_BODY)).asByteArray(); + compressedLen = compressedBodyBytes.length; + compressedBody = new String(compressedBodyBytes); } @AfterEach @@ -77,10 +80,9 @@ public void reset() { public void sync_nonStreaming_compression_compressesCorrectly() { mockHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); - byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); PutOperationWithRequestCompressionRequest request = PutOperationWithRequestCompressionRequest.builder() - .body(SdkBytes.fromByteArray(uncompressedData)) + .body(SdkBytes.fromUtf8String(UNCOMPRESSED_BODY)) .overrideConfiguration(o -> o.requestCompressionConfiguration( c -> c.minimumCompressionThresholdInBytes(1))) .build(); @@ -88,12 +90,11 @@ public void sync_nonStreaming_compression_compressesCorrectly() { SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockHttpClient.getLastRequest(); InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); - byte[] loggedBody = SdkBytes.fromInputStream(loggedStream).asByteArray(); + String loggedBody = new String(SdkBytes.fromInputStream(loggedStream).asByteArray()); int loggedSize = Integer.valueOf(loggedRequest.firstMatchingHeader("Content-Length").get()); - byte[] compressedData = compressor.compress(uncompressedData); - assertThat(loggedBody).isEqualTo(compressedData); - assertThat(loggedSize).isEqualTo(compressedData.length); + assertThat(loggedBody).isEqualTo(compressedBody); + assertThat(loggedSize).isEqualTo(compressedLen); assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); } @@ -101,10 +102,9 @@ public void sync_nonStreaming_compression_compressesCorrectly() { public void async_nonStreaming_compression_compressesCorrectly() { mockAsyncHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); - byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); PutOperationWithRequestCompressionRequest request = PutOperationWithRequestCompressionRequest.builder() - .body(SdkBytes.fromByteArray(uncompressedData)) + .body(SdkBytes.fromUtf8String(UNCOMPRESSED_BODY)) .overrideConfiguration(o -> o.requestCompressionConfiguration( c -> c.minimumCompressionThresholdInBytes(1))) .build(); @@ -113,33 +113,31 @@ public void async_nonStreaming_compression_compressesCorrectly() { SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockAsyncHttpClient.getLastRequest(); InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); - byte[] loggedBody = SdkBytes.fromInputStream(loggedStream).asByteArray(); + String loggedBody = new String(SdkBytes.fromInputStream(loggedStream).asByteArray()); int loggedSize = Integer.valueOf(loggedRequest.firstMatchingHeader("Content-Length").get()); - byte[] compressedData = compressor.compress(uncompressedData); - assertThat(loggedBody).isEqualTo(compressedData); - assertThat(loggedSize).isEqualTo(compressedData.length); + assertThat(loggedBody).isEqualTo(compressedBody); + assertThat(loggedSize).isEqualTo(compressedLen); assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); } @Test public void sync_streaming_compression_compressesCorrectly() { mockHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); - byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); + PutOperationWithStreamingRequestCompressionRequest request = PutOperationWithStreamingRequestCompressionRequest.builder().build(); - syncClient.putOperationWithStreamingRequestCompression(request, RequestBody.fromBytes(uncompressedData), + syncClient.putOperationWithStreamingRequestCompression(request, RequestBody.fromString(UNCOMPRESSED_BODY), ResponseTransformer.toBytes()); SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockHttpClient.getLastRequest(); InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); - byte[] loggedBody = SdkBytes.fromInputStream(loggedStream).asByteArray(); - byte[] compressedData = compressor.compress(uncompressedData); - String chunkBody = Integer.toHexString(compressedData.length) + CRLF + new String(compressedData) + CRLF + "0" + CRLF; + String loggedBody = new String(SdkBytes.fromInputStream(loggedStream).asByteArray()); - assertThat(new String(loggedBody)).isEqualTo(chunkBody); + assertThat(loggedBody).isEqualTo(compressedBody); assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); assertThat(loggedRequest.matchingHeaders("Content-Length")).isEmpty(); + //assertThat(loggedRequest.firstMatchingHeader("Transfer-Encoding").get()).isEqualTo("chunked"); } @Test @@ -147,10 +145,9 @@ public void sync_nonStreaming_compression_withRetry_compressesCorrectly() { mockHttpClient.stubNextResponse(mockErrorResponse(), Duration.ofMillis(500)); mockHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); - byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); PutOperationWithRequestCompressionRequest request = PutOperationWithRequestCompressionRequest.builder() - .body(SdkBytes.fromByteArray(uncompressedData)) + .body(SdkBytes.fromUtf8String(UNCOMPRESSED_BODY)) .overrideConfiguration(o -> o.requestCompressionConfiguration( c -> c.minimumCompressionThresholdInBytes(1))) .build(); @@ -158,12 +155,11 @@ public void sync_nonStreaming_compression_withRetry_compressesCorrectly() { SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockHttpClient.getLastRequest(); InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); - byte[] loggedBody = SdkBytes.fromInputStream(loggedStream).asByteArray(); + String loggedBody = new String(SdkBytes.fromInputStream(loggedStream).asByteArray()); int loggedSize = Integer.valueOf(loggedRequest.firstMatchingHeader("Content-Length").get()); - byte[] compressedData = compressor.compress(uncompressedData); - assertThat(loggedBody).isEqualTo(compressedData); - assertThat(loggedSize).isEqualTo(compressedData.length); + assertThat(loggedBody).isEqualTo(compressedBody); + assertThat(loggedSize).isEqualTo(compressedLen); assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); } @@ -172,10 +168,9 @@ public void async_nonStreaming_compression_withRetry_compressesCorrectly() { mockAsyncHttpClient.stubNextResponse(mockErrorResponse(), Duration.ofMillis(500)); mockAsyncHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); - byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); PutOperationWithRequestCompressionRequest request = PutOperationWithRequestCompressionRequest.builder() - .body(SdkBytes.fromByteArray(uncompressedData)) + .body(SdkBytes.fromUtf8String(UNCOMPRESSED_BODY)) .overrideConfiguration(o -> o.requestCompressionConfiguration( c -> c.minimumCompressionThresholdInBytes(1))) .build(); @@ -184,12 +179,11 @@ public void async_nonStreaming_compression_withRetry_compressesCorrectly() { SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockAsyncHttpClient.getLastRequest(); InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); - byte[] loggedBody = SdkBytes.fromInputStream(loggedStream).asByteArray(); + String loggedBody = new String(SdkBytes.fromInputStream(loggedStream).asByteArray()); int loggedSize = Integer.valueOf(loggedRequest.firstMatchingHeader("Content-Length").get()); - byte[] compressedData = compressor.compress(uncompressedData); - assertThat(loggedBody).isEqualTo(compressedData); - assertThat(loggedSize).isEqualTo(compressedData.length); + assertThat(loggedBody).isEqualTo(compressedBody); + assertThat(loggedSize).isEqualTo(compressedLen); assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); } @@ -198,21 +192,19 @@ public void sync_streaming_compression_withRetry_compressesCorrectly() { mockHttpClient.stubNextResponse(mockErrorResponse(), Duration.ofMillis(500)); mockHttpClient.stubNextResponse(mockResponse(), Duration.ofMillis(500)); - byte[] uncompressedData = COMPRESSABLE_BODY.getBytes(StandardCharsets.UTF_8); PutOperationWithStreamingRequestCompressionRequest request = PutOperationWithStreamingRequestCompressionRequest.builder().build(); - syncClient.putOperationWithStreamingRequestCompression(request, RequestBody.fromBytes(uncompressedData), + syncClient.putOperationWithStreamingRequestCompression(request, RequestBody.fromString(UNCOMPRESSED_BODY), ResponseTransformer.toBytes()); SdkHttpFullRequest loggedRequest = (SdkHttpFullRequest) mockHttpClient.getLastRequest(); InputStream loggedStream = loggedRequest.contentStreamProvider().get().newStream(); - byte[] loggedBody = SdkBytes.fromInputStream(loggedStream).asByteArray(); - byte[] compressedData = compressor.compress(uncompressedData); - String chunkBody = Integer.toHexString(compressedData.length) + CRLF + new String(compressedData) + CRLF + "0" + CRLF; + String loggedBody = new String(SdkBytes.fromInputStream(loggedStream).asByteArray()); - assertThat(new String(loggedBody)).isEqualTo(chunkBody); + assertThat(loggedBody).isEqualTo(compressedBody); assertThat(loggedRequest.firstMatchingHeader("Content-encoding").get()).isEqualTo("gzip"); assertThat(loggedRequest.matchingHeaders("Content-Length")).isEmpty(); + //assertThat(loggedRequest.firstMatchingHeader("Transfer-Encoding").get()).isEqualTo("chunked"); } private HttpExecuteResponse mockResponse() { From 36dd8efd85d8d4045439f90fc75b0c213b8873de Mon Sep 17 00:00:00 2001 From: hdavidh Date: Fri, 14 Jul 2023 17:02:44 -0700 Subject: [PATCH 25/26] Refactoring --- .../pipeline/stages/CompressRequestStage.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java index 95e982fd22fa..dff30cdd1847 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java @@ -66,7 +66,8 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ compressEntirePayload(input, compressor); updateContentEncodingHeader(input, compressor); if (!isStreaming(context)) { - return updateContentLengthHeader(input); + updateContentLengthHeader(input); + return input; } else { return input.removeHeader("Content-Length"); } @@ -107,26 +108,27 @@ private static boolean isStreaming(RequestExecutionContext context) { return context.executionAttributes().getAttribute(SdkInternalExecutionAttribute.REQUEST_COMPRESSION).isStreaming(); } - private SdkHttpFullRequest.Builder compressEntirePayload(SdkHttpFullRequest.Builder input, Compressor compressor) { + private void compressEntirePayload(SdkHttpFullRequest.Builder input, Compressor compressor) { ContentStreamProvider wrappedProvider = input.contentStreamProvider(); ContentStreamProvider compressedStreamProvider = () -> compressor.compress(wrappedProvider.newStream()); - return input.contentStreamProvider(compressedStreamProvider); + input.contentStreamProvider(compressedStreamProvider); } - private static SdkHttpFullRequest.Builder updateContentEncodingHeader(SdkHttpFullRequest.Builder input, + private static void updateContentEncodingHeader(SdkHttpFullRequest.Builder input, Compressor compressor) { if (input.firstMatchingHeader("Content-encoding").isPresent()) { - return input.appendHeader("Content-encoding", compressor.compressorType()); + input.appendHeader("Content-encoding", compressor.compressorType()); + } else { + input.putHeader("Content-encoding", compressor.compressorType()); } - return input.putHeader("Content-encoding", compressor.compressorType()); } - private static SdkHttpFullRequest.Builder updateContentLengthHeader(SdkHttpFullRequest.Builder input) { + private static void updateContentLengthHeader(SdkHttpFullRequest.Builder input) { InputStream inputStream = input.contentStreamProvider().newStream(); try { byte[] bytes = IoUtils.toByteArray(inputStream); String length = String.valueOf(bytes.length); - return input.putHeader("Content-Length", length); + input.putHeader("Content-Length", length); } catch (IOException e) { throw new UncheckedIOException(e); } From 518e4c056370e4ac0d5ed66655965b81fe3cee8b Mon Sep 17 00:00:00 2001 From: hdavidh Date: Fri, 14 Jul 2023 17:09:10 -0700 Subject: [PATCH 26/26] Only compress streaming operations that are chunked --- .../pipeline/stages/CompressRequestStage.java | 18 ++++++++++-------- 1 file changed, 10 insertions(+), 8 deletions(-) diff --git a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java index dff30cdd1847..d16518f76a4d 100644 --- a/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java +++ b/core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/http/pipeline/stages/CompressRequestStage.java @@ -61,16 +61,17 @@ public SdkHttpFullRequest.Builder execute(SdkHttpFullRequest.Builder input, Requ Compressor compressor = resolveCompressionType(context.executionAttributes()); - // non-streaming OR transfer-encoding chunked - if (!isStreaming(context) || isTransferEncodingChunked(input)) { + // non-streaming + if (!isStreaming(context)) { compressEntirePayload(input, compressor); updateContentEncodingHeader(input, compressor); - if (!isStreaming(context)) { - updateContentLengthHeader(input); - return input; - } else { - return input.removeHeader("Content-Length"); - } + updateContentLengthHeader(input); + return input; + } + + // TODO : update this method + if (!isTransferEncodingChunked(input)) { + return input; } if (context.requestProvider() == null) { @@ -134,6 +135,7 @@ private static void updateContentLengthHeader(SdkHttpFullRequest.Builder input) } } + // TODO : update this method private boolean isTransferEncodingChunked(SdkHttpFullRequest.Builder input) { return input.firstMatchingHeader("Transfer-Encoding") .map(headerValue -> headerValue.equals("chunked"))