-
Notifications
You must be signed in to change notification settings - Fork 942
Request compression interceptor and non-streaming compression #4124
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Request compression interceptor and non-streaming compression #4124
Conversation
|
||
private static SdkHttpRequest updateContentEncodingHeader(SdkHttpRequest sdkHttpRequest, | ||
ExecutionAttributes executionAttributes) { | ||
Compressor compressor = resolveCompressionType(executionAttributes); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can this method take a Compressor as a parameter instead of resolving it? It simplifies the logic and is slightly more efficient.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good catch, updating
@SdkPublicApi | ||
public interface Compressor { | ||
|
||
/* |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should be javadoc strings
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
oops, adding second *
/* | ||
* The compression algorithm type. | ||
*/ | ||
String contentType(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this supposed to be compressorType
, per the javadoc?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
right, updating
/* | ||
* Compress an async stream. | ||
*/ | ||
Publisher<ByteBuffer> compressAsyncStream(InputStream inputStream); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't this take a Publisher<ByteBuffer>
as input?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yep, updating
import software.amazon.awssdk.core.exception.SdkClientException; | ||
|
||
@SdkInternalApi | ||
public class GzipCompressor implements Compressor { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can probably be final
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adding modifier
public Compressor compressor() { | ||
if (value == null) { | ||
return null; | ||
} | ||
if (value.equals("gzip")) { | ||
return new GzipCompressor(); | ||
} | ||
return null; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we move this to the Compressor
interface instead, so it matches what we do in SdkChecksum
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good idea, migrating
case GZIP: | ||
return new GzipCompressor(); | ||
default: | ||
return null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should throw for unrecognized types
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
updated to throw IllegalArgumentException
InputStream inputStream = sdkHttpFullRequest.contentStreamProvider().get().newStream(); | ||
InputStream compressedStream = compressor.compress(inputStream); | ||
SdkHttpRequest sdkHttpRequest = | ||
sdkHttpFullRequest.toBuilder() | ||
.contentStreamProvider(() -> compressedStream) | ||
.build(); | ||
sdkHttpRequest = updateContentEncodingHeader(sdkHttpRequest, compressor); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is is only doing a one-time compression of the wrapped stream. The new content stream provider should be wrapping the existing provider, something like this:
ContentStreamProvider compressedStreamProvider = () -> compressor.compress(wrappedProvider.newStream());
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
gotchu, updated
if (isTransferEncodingChunked(context)) { | ||
InputStream compressedStream = compressor.compress(requestBody.contentStreamProvider().newStream()); | ||
try { | ||
byte[] compressedBytes = IoUtils.toByteArray(compressedStream); | ||
return Optional.of(RequestBody.fromBytes(compressedBytes)); | ||
} catch(IOException e){ | ||
throw SdkClientException.create(e.getMessage(), e); | ||
} | ||
} | ||
|
||
CompressionContentStreamProvider streamProvider = | ||
new CompressionContentStreamProvider(requestBody.contentStreamProvider(), compressor); | ||
return Optional.of(RequestBody.fromContentProvider(streamProvider, requestBody.contentType())); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is there separate logic for chunked encoding? Does this interceptor need to care about chunked encoding at all?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The SEP states:
If isStreaming results in Transfer-Encoding: chunked, then the stream must be compressed before it is chunked
So if its chunked encoding, we'll compress the entire body. Otherwise, we'll compress in chunks
} | ||
|
||
@Override | ||
public Optional<RequestBody> modifyHttpContent(Context.ModifyHttpRequest context, ExecutionAttributes executionAttributes) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we setting the content stream provider both here and modifyHttpRequest
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If its a non-streaming request, it won't be set in modifyHttpContent (will just return the empty Optional<RequestBody)
If its a streaming request, it won't be set in modifyHttpRequest (will just update the content encoding header and return the SdkHttpRequest)
Removing updates and moving the streaming compression to separate PR as discussed
SonarCloud Quality Gate failed.
|
…cce3d0c20 Pull request: release <- staging/13c825e9-633d-4d14-b4f3-339cce3d0c20
Motivation and Context
Adding request compression interceptor to handle compression, and implementing non-streaming compression.
Modifications
Added CompressionType enum for supported encodings, Compressor interface and GzipCompressor implementation.
Testing
To add tests once streaming compression is implemented.
Was able to send compressed PutMetricData request to CloudWatch successfully
Screenshots (if appropriate)
Types of changes
Checklist
mvn install
succeedsscripts/new-change
script and following the instructions. Commit the new file created by the script in.changes/next-release
with your changes.License