Skip to content

Commit 7c04729

Browse files
committed
Add AsyncRequestBody#fromInputStream(AsyncRequestBodyFromInputStreamConfiguration) to allow users to specify the max read limit on the InputStream
1 parent e99bebf commit 7c04729

File tree

9 files changed

+414
-10
lines changed

9 files changed

+414
-10
lines changed
Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,6 @@
1+
{
2+
"type": "feature",
3+
"category": "AWS SDK for Java v2",
4+
"contributor": "",
5+
"description": "Introduce `AsyncRequestBody#fromInputStream(AsyncRequestBodyFromInputStreamConfiguration)` that allows users to specify the max read limit on the provided InputStream."
6+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/async/AsyncRequestBody.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -358,7 +358,27 @@ static AsyncRequestBody fromRemainingByteBuffersUnsafe(ByteBuffer... byteBuffers
358358
* non-blocking event loop threads owned by the SDK.
359359
*/
360360
static AsyncRequestBody fromInputStream(InputStream inputStream, Long contentLength, ExecutorService executor) {
361-
return new InputStreamWithExecutorAsyncRequestBody(inputStream, contentLength, executor);
361+
return fromInputStream(b -> b.inputStream(inputStream).contentLength(contentLength).executor(executor));
362+
}
363+
364+
/**
365+
* Creates an {@link AsyncRequestBody} from an {@link InputStream} with the provided
366+
* {@link AsyncRequestBodySplitConfiguration}.
367+
*/
368+
static AsyncRequestBody fromInputStream(AsyncRequestBodyFromInputStreamConfiguration configuration) {
369+
Validate.notNull(configuration, "configuration");
370+
return new InputStreamWithExecutorAsyncRequestBody(configuration);
371+
}
372+
373+
/**
374+
* This is a convenience method that passes an instance of the {@link AsyncRequestBodyFromInputStreamConfiguration} builder,
375+
* avoiding the need to create one manually via {@link AsyncRequestBodyFromInputStreamConfiguration#builder()}.
376+
*
377+
* @see #fromInputStream(AsyncRequestBodyFromInputStreamConfiguration)
378+
*/
379+
static AsyncRequestBody fromInputStream(Consumer<AsyncRequestBodyFromInputStreamConfiguration.Builder> configuration) {
380+
Validate.notNull(configuration, "configuration");
381+
return fromInputStream(AsyncRequestBodyFromInputStreamConfiguration.builder().applyMutation(configuration).build());
362382
}
363383

364384
/**
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,201 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import java.io.InputStream;
19+
import java.util.Objects;
20+
import java.util.concurrent.ExecutorService;
21+
import software.amazon.awssdk.annotations.SdkPublicApi;
22+
import software.amazon.awssdk.utils.Validate;
23+
import software.amazon.awssdk.utils.builder.CopyableBuilder;
24+
import software.amazon.awssdk.utils.builder.ToCopyableBuilder;
25+
26+
/**
27+
* Configuration options for {@link AsyncRequestBody#fromInputStream(AsyncRequestBodyFromInputStreamConfiguration)}
28+
* to configure how the SDK should create an {@link AsyncRequestBody} from an {@link InputStream}.
29+
*/
30+
@SdkPublicApi
31+
public final class AsyncRequestBodyFromInputStreamConfiguration
32+
implements ToCopyableBuilder<AsyncRequestBodyFromInputStreamConfiguration.Builder,
33+
AsyncRequestBodyFromInputStreamConfiguration> {
34+
private final InputStream inputStream;
35+
private final Long contentLength;
36+
private final ExecutorService executor;
37+
private final Integer maxReadLimit;
38+
39+
private AsyncRequestBodyFromInputStreamConfiguration(DefaultBuilder builder) {
40+
this.inputStream = Validate.paramNotNull(builder.inputStream, "inputStream");
41+
this.contentLength = Validate.isNotNegativeOrNull(builder.contentLength, "contentLength");
42+
this.maxReadLimit = Validate.isPositiveOrNull(builder.maxReadLimit, "maxReadLimit");
43+
this.executor = Validate.paramNotNull(builder.executor, "executor");
44+
}
45+
46+
/**
47+
* @return the provided {@link InputStream}.
48+
*/
49+
public InputStream inputStream() {
50+
return inputStream;
51+
}
52+
53+
/**
54+
* @return the provided content length.
55+
*/
56+
public Long contentLength() {
57+
return contentLength;
58+
}
59+
60+
/**
61+
* @return the provided {@link ExecutorService}.
62+
*/
63+
public ExecutorService executor() {
64+
return executor;
65+
}
66+
67+
/**
68+
* @return the provided max read limit used to mark and reset the {@link InputStream}).
69+
*/
70+
public Integer maxReadLimit() {
71+
return maxReadLimit;
72+
}
73+
74+
@Override
75+
public boolean equals(Object o) {
76+
if (this == o) {
77+
return true;
78+
}
79+
if (o == null || getClass() != o.getClass()) {
80+
return false;
81+
}
82+
83+
AsyncRequestBodyFromInputStreamConfiguration that = (AsyncRequestBodyFromInputStreamConfiguration) o;
84+
85+
if (!Objects.equals(inputStream, that.inputStream)) {
86+
return false;
87+
}
88+
if (!Objects.equals(contentLength, that.contentLength)) {
89+
return false;
90+
}
91+
if (!Objects.equals(executor, that.executor)) {
92+
return false;
93+
}
94+
return Objects.equals(maxReadLimit, that.maxReadLimit);
95+
}
96+
97+
@Override
98+
public int hashCode() {
99+
int result = inputStream != null ? inputStream.hashCode() : 0;
100+
result = 31 * result + (contentLength != null ? contentLength.hashCode() : 0);
101+
result = 31 * result + (executor != null ? executor.hashCode() : 0);
102+
result = 31 * result + (maxReadLimit != null ? maxReadLimit.hashCode() : 0);
103+
return result;
104+
}
105+
106+
/**
107+
* Create a {@link Builder}, used to create a {@link AsyncRequestBodyFromInputStreamConfiguration}.
108+
*/
109+
public static Builder builder() {
110+
return new DefaultBuilder();
111+
}
112+
113+
114+
@Override
115+
public AsyncRequestBodyFromInputStreamConfiguration.Builder toBuilder() {
116+
return new DefaultBuilder(this);
117+
}
118+
119+
public interface Builder extends CopyableBuilder<AsyncRequestBodyFromInputStreamConfiguration.Builder,
120+
AsyncRequestBodyFromInputStreamConfiguration> {
121+
122+
/**
123+
* Configures the InputStream.
124+
*
125+
* @param inputStream the InputStream
126+
* @return This object for method chaining.
127+
*/
128+
Builder inputStream(InputStream inputStream);
129+
130+
/**
131+
* Configures the length of the provided {@link InputStream}
132+
* @param contentLength the content length
133+
* @return This object for method chaining.
134+
*/
135+
Builder contentLength(Long contentLength);
136+
137+
/**
138+
* Configures the {@link ExecutorService} to perform the blocking data reads.
139+
*
140+
* @param executor the executor
141+
* @return This object for method chaining.
142+
*/
143+
Builder executor(ExecutorService executor);
144+
145+
/**
146+
* Configures max read limit used to mark and reset the {@link InputStream}. This will have no
147+
* effect if the stream doesn't support mark and reset.
148+
*
149+
* <p>
150+
* By default, it is 128 KiB.
151+
*
152+
* @param maxReadLimit the max read limit
153+
* @return This object for method chaining.
154+
* @see InputStream#mark(int)
155+
*/
156+
Builder maxReadLimit(Integer maxReadLimit);
157+
}
158+
159+
private static final class DefaultBuilder implements Builder {
160+
private InputStream inputStream;
161+
private Long contentLength;
162+
private ExecutorService executor;
163+
private Integer maxReadLimit;
164+
165+
private DefaultBuilder(AsyncRequestBodyFromInputStreamConfiguration asyncRequestBodyFromInputStreamConfiguration) {
166+
this.inputStream = asyncRequestBodyFromInputStreamConfiguration.inputStream;
167+
this.contentLength = asyncRequestBodyFromInputStreamConfiguration.contentLength;
168+
this.executor = asyncRequestBodyFromInputStreamConfiguration.executor;
169+
this.maxReadLimit = asyncRequestBodyFromInputStreamConfiguration.maxReadLimit;
170+
}
171+
172+
private DefaultBuilder() {
173+
174+
}
175+
176+
public Builder inputStream(InputStream inputStream) {
177+
this.inputStream = inputStream;
178+
return this;
179+
}
180+
181+
public Builder contentLength(Long contentLength) {
182+
this.contentLength = contentLength;
183+
return this;
184+
}
185+
186+
public Builder executor(ExecutorService executor) {
187+
this.executor = executor;
188+
return this;
189+
}
190+
191+
public Builder maxReadLimit(Integer maxReadLimit) {
192+
this.maxReadLimit = maxReadLimit;
193+
return this;
194+
}
195+
196+
@Override
197+
public AsyncRequestBodyFromInputStreamConfiguration build() {
198+
return new AsyncRequestBodyFromInputStreamConfiguration(this);
199+
}
200+
}
201+
}

core/sdk-core/src/main/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBody.java

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import software.amazon.awssdk.annotations.SdkInternalApi;
3030
import software.amazon.awssdk.annotations.SdkTestInternalApi;
3131
import software.amazon.awssdk.core.async.AsyncRequestBody;
32+
import software.amazon.awssdk.core.async.AsyncRequestBodyFromInputStreamConfiguration;
3233
import software.amazon.awssdk.core.async.BlockingInputStreamAsyncRequestBody;
3334
import software.amazon.awssdk.core.exception.NonRetryableException;
3435
import software.amazon.awssdk.core.internal.util.NoopSubscription;
@@ -52,13 +53,11 @@ public class InputStreamWithExecutorAsyncRequestBody implements AsyncRequestBody
5253

5354
private Future<?> writeFuture;
5455

55-
public InputStreamWithExecutorAsyncRequestBody(InputStream inputStream,
56-
Long contentLength,
57-
ExecutorService executor) {
58-
this.inputStream = inputStream;
59-
this.contentLength = contentLength;
60-
this.executor = executor;
61-
IoUtils.markStreamWithMaxReadLimit(inputStream);
56+
public InputStreamWithExecutorAsyncRequestBody(AsyncRequestBodyFromInputStreamConfiguration configuration) {
57+
this.inputStream = configuration.inputStream();
58+
this.contentLength = configuration.contentLength();
59+
this.executor = configuration.executor();
60+
IoUtils.markStreamWithMaxReadLimit(inputStream, configuration.maxReadLimit());
6261
}
6362

6463
@Override
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,92 @@
1+
/*
2+
* Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License").
5+
* You may not use this file except in compliance with the License.
6+
* A copy of the License is located at
7+
*
8+
* http://aws.amazon.com/apache2.0
9+
*
10+
* or in the "license" file accompanying this file. This file is distributed
11+
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
12+
* express or implied. See the License for the specific language governing
13+
* permissions and limitations under the License.
14+
*/
15+
16+
package software.amazon.awssdk.core.async;
17+
18+
import static org.assertj.core.api.Assertions.assertThat;
19+
import static org.assertj.core.api.Assertions.assertThatThrownBy;
20+
import static org.mockito.Mockito.mock;
21+
22+
import java.io.InputStream;
23+
import java.util.concurrent.ExecutorService;
24+
import nl.jqno.equalsverifier.EqualsVerifier;
25+
import org.junit.jupiter.api.Test;
26+
import org.junit.jupiter.params.ParameterizedTest;
27+
import org.junit.jupiter.params.provider.ValueSource;
28+
29+
public class AsyncRequestBodyFromInputStreamConfigurationTest {
30+
31+
@Test
32+
void equalsHashcode() {
33+
EqualsVerifier.forClass(AsyncRequestBodyFromInputStreamConfiguration.class)
34+
.verify();
35+
}
36+
37+
@Test
38+
void toBuilder_shouldCopyProperties() {
39+
InputStream inputStream = mock(InputStream.class);
40+
ExecutorService executorService = mock(ExecutorService.class);
41+
AsyncRequestBodyFromInputStreamConfiguration configuration = AsyncRequestBodyFromInputStreamConfiguration.builder()
42+
.inputStream(inputStream)
43+
.contentLength(10L)
44+
.executor(executorService)
45+
.maxReadLimit(10)
46+
.build();
47+
assertThat(configuration.toBuilder().build()).isEqualTo(configuration);
48+
49+
}
50+
51+
@Test
52+
void inputStreamIsNull_shouldThrowException() {
53+
assertThatThrownBy(() ->
54+
AsyncRequestBodyFromInputStreamConfiguration.builder()
55+
.executor(mock(ExecutorService.class))
56+
.build())
57+
.isInstanceOf(NullPointerException.class).hasMessageContaining("inputStream");
58+
}
59+
60+
61+
@Test
62+
void executorIsNull_shouldThrowException() {
63+
assertThatThrownBy(() ->
64+
AsyncRequestBodyFromInputStreamConfiguration.builder()
65+
.inputStream(mock(InputStream.class))
66+
.build())
67+
.isInstanceOf(NullPointerException.class).hasMessageContaining("executor");
68+
}
69+
70+
@ParameterizedTest
71+
@ValueSource(ints = {0, -1})
72+
void readLimitNotPositive_shouldThrowException(int value) {
73+
assertThatThrownBy(() ->
74+
AsyncRequestBodyFromInputStreamConfiguration.builder()
75+
.inputStream(mock(InputStream.class))
76+
.executor(mock(ExecutorService.class))
77+
.maxReadLimit(value)
78+
.build())
79+
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("maxReadLimit");
80+
}
81+
82+
@Test
83+
void contentLengthNegative_shouldThrowException() {
84+
assertThatThrownBy(() ->
85+
AsyncRequestBodyFromInputStreamConfiguration.builder()
86+
.inputStream(mock(InputStream.class))
87+
.executor(mock(ExecutorService.class))
88+
.contentLength(-1L)
89+
.build())
90+
.isInstanceOf(IllegalArgumentException.class).hasMessageContaining("contentLength");
91+
}
92+
}

core/sdk-core/src/test/java/software/amazon/awssdk/core/internal/async/InputStreamWithExecutorAsyncRequestBodyTest.java

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import java.util.concurrent.Executors;
2727
import org.junit.jupiter.api.Test;
2828
import org.junit.jupiter.api.Timeout;
29+
import software.amazon.awssdk.core.async.AsyncRequestBody;
2930
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber;
3031
import software.amazon.awssdk.utils.async.ByteBufferStoringSubscriber.TransferResult;
3132

@@ -39,7 +40,7 @@ public void dataFromInputStreamIsCopied() throws Exception {
3940
PipedInputStream is = new PipedInputStream(os);
4041

4142
InputStreamWithExecutorAsyncRequestBody asyncRequestBody =
42-
new InputStreamWithExecutorAsyncRequestBody(is, 4L, executor);
43+
(InputStreamWithExecutorAsyncRequestBody) AsyncRequestBody.fromInputStream(b -> b.inputStream(is).executor(executor).contentLength(4L));
4344

4445
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(8);
4546
asyncRequestBody.subscribe(subscriber);
@@ -75,8 +76,10 @@ public void errorsReadingInputStreamAreForwardedToSubscriber() throws Exception
7576
PipedInputStream is = new PipedInputStream(os);
7677

7778
is.close();
79+
7880
InputStreamWithExecutorAsyncRequestBody asyncRequestBody =
79-
new InputStreamWithExecutorAsyncRequestBody(is, 4L, executor);
81+
(InputStreamWithExecutorAsyncRequestBody) AsyncRequestBody.fromInputStream(b -> b.inputStream(is).executor(executor).contentLength(4L));
82+
8083

8184
ByteBufferStoringSubscriber subscriber = new ByteBufferStoringSubscriber(8);
8285
asyncRequestBody.subscribe(subscriber);

0 commit comments

Comments
 (0)