Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
157fc10
Rename DecodedStreamBuffer to UnderlyingStreamBuffer
davidh44 Jun 26, 2023
29380ab
Update Compressor
davidh44 Jun 26, 2023
228a035
Sync streaming compression
davidh44 Jun 26, 2023
280bd38
Fix Checkstyle issues
davidh44 Jun 26, 2023
e2d6951
Refactor to common class AwsChunkedInputStream
davidh44 Jun 28, 2023
be06647
Refactor Compressor
davidh44 Jun 28, 2023
04b5e7c
Merge from feature/master/request-compression
davidh44 Jun 28, 2023
5b5641c
Refactor CompressionType
davidh44 Jun 28, 2023
7e07424
sync compression
davidh44 Jun 29, 2023
ba7e602
Close GZIPOutputStream
davidh44 Jun 29, 2023
5b37111
Add compress stage to async http client
davidh44 Jun 29, 2023
cd241a6
Add compressed PutMetricData integ test
davidh44 Jun 29, 2023
0f4fd59
Merge branch 'feature/master/request-compression' into hdavidh/sync-s…
davidh44 Jun 29, 2023
5268c0b
Refactoring
davidh44 Jul 5, 2023
d8aa9a5
Add tests
davidh44 Jul 5, 2023
37d962e
Add equals and hashCode
davidh44 Jul 7, 2023
aab6f62
Add TODO
davidh44 Jul 7, 2023
4b52a88
Add tests with retry
davidh44 Jul 7, 2023
d9e28d9
Update content length retrieval
davidh44 Jul 11, 2023
86ff75a
CompressionType test
davidh44 Jul 11, 2023
24d69db
RequestCompressionConfiguration test
davidh44 Jul 11, 2023
9dadea3
Fix Spotbugs error
davidh44 Jul 11, 2023
95e0383
Rename CompressionType to CompressorType
davidh44 Jul 12, 2023
4d1e924
Fix import ordering
davidh44 Jul 12, 2023
ebc0c35
Remove chunk headers and trailers
davidh44 Jul 14, 2023
1d9bb6f
Update tests
davidh44 Jul 14, 2023
36dd8ef
Refactoring
davidh44 Jul 15, 2023
518e4c0
Only compress streaming operations that are chunked
davidh44 Jul 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@
@SdkInternalApi
public final class AwsSignedChunkedEncodingInputStream extends AwsChunkedEncodingInputStream {

private static final String CRLF = "\r\n";
private static final String CHUNK_SIGNATURE_HEADER = ";chunk-signature=";
private static final String CHECKSUM_SIGNATURE_HEADER = "x-amz-trailer-signature:";
private String previousChunkSignature;
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -17,44 +17,58 @@

import java.io.InputStream;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import software.amazon.awssdk.annotations.SdkPublicApi;
import software.amazon.awssdk.core.internal.compression.GzipCompressor;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.internal.http.pipeline.stages.CompressRequestStage;

/**
* Interface for compressors used by {@link CompressRequestStage} to compress requests.
* TODO: this will be refactored in the other PR
*/
@SdkPublicApi
@SdkInternalApi
public interface Compressor {

/**
* The compression algorithm type.
*
* @return The {@link String} compression algorithm type.
*/
String compressorType();

/**
* Compress an {@link InputStream}.
* Compress a {@link SdkBytes} payload.
*
* @param content
* @return The compressed {@link SdkBytes}.
*/
InputStream compress(InputStream inputStream);
SdkBytes compress(SdkBytes content);

/**
* Compress an async stream.
* Compress a byte[] payload.
*
* @param content
* @return The compressed byte array.
*/
Publisher<ByteBuffer> compressAsyncStream(Publisher<ByteBuffer> publisher);
default byte[] compress(byte[] content) {
return compress(SdkBytes.fromByteArray(content)).asByteArray();
}

/**
* Compress an {@link InputStream} payload.
*
* @param content
* @return The compressed {@link InputStream}.
*/
default InputStream compress(InputStream content) {
return compress(SdkBytes.fromInputStream(content)).asInputStream();
}

/**
* Maps the {@link CompressionType} to its corresponding {@link Compressor}.
* TODO: Update mappings here when additional compressors are supported in the future
* Compress an {@link ByteBuffer} payload.
*
* @param content
* @return The compressed {@link ByteBuffer}.
*/
static Compressor forCompressorType(CompressionType compressionType) {
switch (compressionType) {
case GZIP:
return new GzipCompressor();
default:
throw new IllegalArgumentException("The compresssion type " + compressionType + "does not have an implemenation"
+ " of Compressor.");
}
default ByteBuffer compress(ByteBuffer content) {
return compress(SdkBytes.fromByteBuffer(content)).asByteBuffer();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
/*
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.core.compression;

import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.internal.compression.GzipCompressor;
import software.amazon.awssdk.utils.Validate;

/**
* The supported compression algorithms for operations with the requestCompression trait. Each supported algorithm will have an
* {@link Compressor} implementation.
*/
@SdkInternalApi
public final class CompressorType {

public static final CompressorType GZIP = CompressorType.of("gzip");

private static Map<String, Compressor> compressorMap = new HashMap<String, Compressor>() {{
put("gzip", new GzipCompressor());
}};

private final String id;

private CompressorType(String id) {
this.id = id;
}

/**
* Creates a new {@link CompressorType} of the given value.
*/
public static CompressorType of(String value) {
Validate.paramNotBlank(value, "compressionType");
return CompressorTypeCache.put(value);
}

/**
* Returns the {@link Set} of {@link String}s of compression types supported by the SDK.
*/
public static Set<String> compressorTypes() {
return compressorMap.keySet();
}

/**
* Whether or not the compression type is supported by the SDK.
*/
public static boolean isSupported(String compressionType) {
return compressorTypes().contains(compressionType);
}

/**
* Maps the {@link CompressorType} to its corresponding {@link Compressor}.
*/
public Compressor newCompressor() {
Compressor compressor = compressorMap.getOrDefault(this.id, null);
if (compressor == null) {
throw new UnsupportedOperationException("The compression type " + id + " does not have an implementation of "
+ "Compressor");
}
return compressor;
}

@Override
public String toString() {
return id;
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}

CompressorType that = (CompressorType) o;
return Objects.equals(id, that.id)
&& Objects.equals(compressorMap, that.compressorMap);
}

@Override
public int hashCode() {
int result = id != null ? id.hashCode() : 0;
result = 31 * result + (compressorMap != null ? compressorMap.hashCode() : 0);
return result;
}

private static class CompressorTypeCache {
private static final ConcurrentHashMap<String, CompressorType> VALUES = new ConcurrentHashMap<>();

private CompressorTypeCache() {
}

private static CompressorType put(String value) {
return VALUES.computeIfAbsent(value, v -> new CompressorType(value));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,46 +15,42 @@

package software.amazon.awssdk.core.internal.compression;

import java.io.ByteArrayInputStream;
import static software.amazon.awssdk.utils.IoUtils.closeQuietly;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
import java.io.UncheckedIOException;
import java.util.zip.GZIPOutputStream;
import org.reactivestreams.Publisher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.annotations.SdkInternalApi;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.core.compression.Compressor;
import software.amazon.awssdk.core.exception.SdkClientException;
import software.amazon.awssdk.utils.IoUtils;

@SdkInternalApi
public final class GzipCompressor implements Compressor {

private static final String COMPRESSOR_TYPE = "gzip";
private static final Logger log = LoggerFactory.getLogger(GzipCompressor.class);

@Override
public String compressorType() {
return COMPRESSOR_TYPE;
}

@Override
public InputStream compress(InputStream inputStream) {
public SdkBytes compress(SdkBytes content) {
GZIPOutputStream gzipOutputStream = null;
try {
byte[] content = IoUtils.toByteArray(inputStream);
ByteArrayOutputStream compressedOutputStream = new ByteArrayOutputStream();
GZIPOutputStream gzipOutputStream = new GZIPOutputStream(compressedOutputStream);
gzipOutputStream.write(content);
gzipOutputStream = new GZIPOutputStream(compressedOutputStream);
gzipOutputStream.write(content.asByteArray());
gzipOutputStream.close();

return new ByteArrayInputStream(compressedOutputStream.toByteArray());
return SdkBytes.fromByteArray(compressedOutputStream.toByteArray());
} catch (IOException e) {
throw SdkClientException.create(e.getMessage(), e);
throw new UncheckedIOException(e);
} finally {
closeQuietly(gzipOutputStream, log);
}
}

@Override
public Publisher<ByteBuffer> compressAsyncStream(Publisher<ByteBuffer> publisher) {
//TODO
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncExecutionFailureExceptionReportingStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncRetryableStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.AsyncSigningStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.CompressRequestStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeAsyncHttpRequestStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeRequestImmutableStage;
import software.amazon.awssdk.core.internal.http.pipeline.stages.MakeRequestMutableStage;
Expand Down Expand Up @@ -169,6 +170,7 @@ public <OutputT> CompletableFuture<OutputT> execute(
.then(ApplyUserAgentStage::new)
.then(MergeCustomHeadersStage::new)
.then(MergeCustomQueryParamsStage::new)
.then(CompressRequestStage::new)
.then(MakeRequestImmutableStage::new)
.then(RequestPipelineBuilder
.first(AsyncSigningStage::new)
Expand Down
Loading