Skip to content

Implementing string and byte[] async response handlers. Porting over … #161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Sep 15, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package software.amazon.awssdk.async;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.nio.file.Path;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscription;
Expand Down Expand Up @@ -91,4 +93,35 @@ static <ResponseT> AsyncResponseHandler<ResponseT, ResponseT> toFile(Path path)
return new FileAsyncResponseHandler<>(path);
}

/**
* Creates an {@link AsyncResponseHandler} that writes all content to a byte array.
*
* @param <ResponseT> Pojo response type.
* @return AsyncResponseHandler instance.
*/
static <ResponseT> AsyncResponseHandler<ResponseT, byte[]> toByteArray() {
return new ByteArrayAsyncResponseHandler<>();
}

/**
* Creates an {@link AsyncResponseHandler} that writes all content to a string using the specified encoding.
*
* @param charset {@link Charset} to use when constructing the string.
* @param <ResponseT> Pojo response type.
* @return AsyncResponseHandler instance.
Copy link
Contributor

@millems millems Sep 15, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@param charset + other javadoc changes

*/
static <ResponseT> AsyncResponseHandler<ResponseT, String> toString(Charset charset) {
return new StringAsyncResponseHandler<>(toByteArray(), charset);
}

/**
* Creates an {@link AsyncResponseHandler} that writes all content to UTF8 encoded string.
*
* @param <ResponseT> Pojo response type.
* @return AsyncResponseHandler instance.
*/
static <ResponseT> AsyncResponseHandler<ResponseT, String> toUtf8String() {
return toString(StandardCharsets.UTF_8);
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.async;

import static software.amazon.awssdk.utils.FunctionalUtils.invokeSafely;

import java.io.ByteArrayOutputStream;
import java.nio.ByteBuffer;
import org.reactivestreams.Publisher;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import software.amazon.awssdk.annotation.SdkInternalApi;
import software.amazon.awssdk.utils.BinaryUtils;

/**
* Implementation of {@link AsyncResponseHandler} that dumps content into a byte array.
*
* @param <ResponseT> Pojo response type.
*/
@SdkInternalApi
class ByteArrayAsyncResponseHandler<ResponseT> implements AsyncResponseHandler<ResponseT, byte[]> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toString()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's actually the subscriber we call toString on. Since the byte subscriber didn't have any identifying state I just left it as the default implementation. I can change it to whatever though, just let me know.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nah, it's aight.


private ByteArrayOutputStream baos;

@Override
public void responseReceived(ResponseT response) {
}

@Override
public void onStream(Publisher<ByteBuffer> publisher) {
baos = new ByteArrayOutputStream();
publisher.subscribe(new BaosSubscriber());
}

@Override
public void exceptionOccurred(Throwable throwable) {
baos = null;
}

@Override
public byte[] complete() {
try {
return baos.toByteArray();
} finally {
baos = null;
}
}

/**
* Requests chunks sequentially and dumps them into a {@link ByteArrayOutputStream}.
*/
private class BaosSubscriber implements Subscriber<ByteBuffer> {

private Subscription subscription;

@Override
public void onSubscribe(Subscription s) {
this.subscription = s;
subscription.request(1);
}

@Override
public void onNext(ByteBuffer byteBuffer) {
invokeSafely(() -> baos.write(BinaryUtils.copyBytesFrom(byteBuffer)));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I must be going crazy because I can't find that this method actually exists. http://docs.oracle.com/javase/8/docs/api/?java/io/ByteArrayOutputStream.html What am I missing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Found it!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need to copyBytesFrom, or can we just use array since the output stream copies it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if we can assume it's backed by an array or that array constitutes the entire contents of the byte buffer.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I guess we would need to check if it hasArray and preserve the indices if we were to do it.

subscription.request(1);
}

@Override
public void onError(Throwable throwable) {
// Handled by response handler
}

@Override
public void onComplete() {
// Handled by response handler
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,5 +123,10 @@ public void onError(Throwable t) {
public void onComplete() {
// Completion handled by response handler
}

@Override
public String toString() {
return getClass() + ":" + path.toString();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
/*
* Copyright 2010-2017 Amazon.com, Inc. or its affiliates. All Rights Reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License").
* You may not use this file except in compliance with the License.
* A copy of the License is located at
*
* http://aws.amazon.com/apache2.0
*
* or in the "license" file accompanying this file. This file is distributed
* on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either
* express or implied. See the License for the specific language governing
* permissions and limitations under the License.
*/

package software.amazon.awssdk.async;

import java.nio.ByteBuffer;
import java.nio.charset.Charset;
import org.reactivestreams.Publisher;
import software.amazon.awssdk.annotation.SdkInternalApi;

/**
* Implementation of {@link AsyncResponseHandler} that dumps content into a string using the specified {@link Charset}.
*
* @param <ResponseT> Pojo response type.
*/
@SdkInternalApi
class StringAsyncResponseHandler<ResponseT> implements AsyncResponseHandler<ResponseT, String> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

toString()?


private final AsyncResponseHandler<ResponseT, byte[]> byteArrayResponseHandler;
private final Charset charset;

/**
* @param byteArrayResponseHandler {@link AsyncResponseHandler} implementation that dumps data into a byte array.
* @param charset Charset to use for String.
*/
StringAsyncResponseHandler(AsyncResponseHandler<ResponseT, byte[]> byteArrayResponseHandler,
Charset charset) {
this.byteArrayResponseHandler = byteArrayResponseHandler;
this.charset = charset;
}

@Override
public void responseReceived(ResponseT response) {
byteArrayResponseHandler.responseReceived(response);
}

@Override
public void onStream(Publisher<ByteBuffer> publisher) {
byteArrayResponseHandler.onStream(publisher);
}

@Override
public void exceptionOccurred(Throwable throwable) {
byteArrayResponseHandler.exceptionOccurred(throwable);
}

@Override
public String complete() {
return new String(byteArrayResponseHandler.complete(), charset);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
package software.amazon.awssdk.http.nio.netty.internal;

import io.netty.util.AttributeKey;
import java.nio.ByteBuffer;
import org.reactivestreams.Subscriber;

/**
* Keys for attributes attached via {@link io.netty.channel.Channel#attr(AttributeKey)}.
Expand All @@ -27,6 +29,8 @@ class ChannelAttributeKeys {
*/
static final AttributeKey<RequestContext> REQUEST_CONTEXT_KEY = AttributeKey.newInstance("requestContext");

static final AttributeKey<Subscriber<? super ByteBuffer>> SUBSCRIBER_KEY = AttributeKey.newInstance("subscriber");

private ChannelAttributeKeys() {
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,17 @@
import static java.util.stream.Collectors.mapping;
import static software.amazon.awssdk.http.nio.netty.internal.ChannelAttributeKeys.REQUEST_CONTEXT_KEY;

import com.typesafe.netty.http.HttpStreamsClientHandler;
import com.typesafe.netty.http.StreamedHttpResponse;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.handler.codec.http.FullHttpResponse;
import io.netty.handler.codec.http.HttpContent;
import io.netty.handler.codec.http.HttpHeaders;
import io.netty.handler.codec.http.HttpObject;
import io.netty.handler.codec.http.HttpResponse;
import io.netty.handler.codec.http.HttpUtil;
import io.netty.handler.codec.http.LastHttpContent;
import io.netty.util.AttributeKey;
import java.nio.ByteBuffer;
import java.util.List;
Expand Down Expand Up @@ -58,31 +60,65 @@ protected void channelRead0(ChannelHandlerContext channelContext, HttpObject msg
RequestContext requestContext = channelContext.channel().attr(REQUEST_CONTEXT_KEY).get();


if (msg instanceof StreamedHttpResponse) {
StreamedHttpResponse response = (StreamedHttpResponse) msg;
if (msg instanceof HttpResponse) {
HttpResponse response = (HttpResponse) msg;
SdkHttpResponse sdkResponse = SdkHttpFullResponse.builder()
.headers(fromNettyHeaders(response.headers()))
.statusCode(response.status().code())
.statusText(response.status().reasonPhrase())
.build();
channelContext.channel().attr(KEEP_ALIVE).set(HttpUtil.isKeepAlive(response));
requestContext.handler().headersReceived(sdkResponse);
requestContext.handler().onStream(new PublisherAdapter(response, channelContext, requestContext));
}

if (msg instanceof StreamedHttpResponse) {
requestContext.handler().onStream(new PublisherAdapter((StreamedHttpResponse) msg, channelContext, requestContext));
} else if (msg instanceof FullHttpResponse) {
requestContext.handler().onStream(new EmptyRequestPublisher(msg, channelContext));
}

if (msg instanceof LastHttpContent) {
Subscriber<? super ByteBuffer> subscriber = channelContext.channel().attr(ChannelAttributeKeys.SUBSCRIBER_KEY).get();
try {
subscriber.onComplete();
requestContext.handler().complete();
} catch (RuntimeException e) {
subscriber.onError(e);
requestContext.handler().exceptionOccurred(e);
throw e;
} finally {
finalizeRequest(requestContext, channelContext);
}
}
}

private static void finalizeRequest(RequestContext requestContext, ChannelHandlerContext channelContext) {
if (!channelContext.channel().attr(KEEP_ALIVE).get()) {
closeAndRelease(channelContext);
} else {
requestContext.channelPool().release(channelContext.channel());
}
}

/**
* Close the channel and release it back into the pool.
*
* @param ctx Context for channel
*/
private static void closeAndRelease(ChannelHandlerContext ctx) {
RequestContext requestContext = ctx.channel().attr(REQUEST_CONTEXT_KEY).get();
ctx.channel().close()
.addListener(channelFuture -> requestContext.channelPool().release(ctx.channel()));
}

@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
RequestContext requestContext = ctx.channel().attr(REQUEST_CONTEXT_KEY).get();
try {
log.error("Exception processing request: {}", requestContext.sdkRequest(), cause);
ctx.fireExceptionCaught(cause);
} finally {
runAndLogError("SdkHttpResponseHandler threw an exception",
() -> requestContext.handler().exceptionOccurred(cause));
runAndLogError("Could not release channel back to the pool",
() -> requestContext.channelPool().release(ctx.channel()));
}
log.error("Exception processing request: {}", requestContext.sdkRequest(), cause);

runAndLogError("SdkHttpResponseHandler threw an exception",
() -> requestContext.handler().exceptionOccurred(cause));
runAndLogError("Could not release channel back to the pool", () -> closeAndRelease(ctx));
}

/**
Expand Down Expand Up @@ -134,22 +170,39 @@ public void onNext(HttpContent httpContent) {

@Override
public void onError(Throwable t) {
subscriber.onError(t);
runAndLogError(String.format("Subscriber %s threw an exception in onError.", subscriber.toString()),
() -> subscriber.onError(t));
requestContext.handler().exceptionOccurred(t);
}

@Override
public void onComplete() {
subscriber.onComplete();
requestContext.handler().complete();
if (!channelContext.channel().attr(KEEP_ALIVE).get()) {
channelContext.channel().close();
try {
runAndLogError(String.format("Subscriber %s threw an exception in onComplete.", subscriber.toString()),
subscriber::onComplete);
requestContext.handler().complete();
} finally {
finalizeRequest(requestContext, channelContext);
}
channelContext.pipeline().remove(HttpStreamsClientHandler.class);
channelContext.pipeline().remove(ResponseHandler.class);
requestContext.channelPool().release(channelContext.channel());
}
});
}
}

private static class EmptyRequestPublisher implements Publisher<ByteBuffer> {
private final HttpObject msg;
private final ChannelHandlerContext channelContext;

private EmptyRequestPublisher(HttpObject msg, ChannelHandlerContext channelContext) {
this.msg = msg;
this.channelContext = channelContext;
}

@Override
public void subscribe(Subscriber<? super ByteBuffer> subscriber) {
subscriber.onNext(((FullHttpResponse) msg).content().nioBuffer());
channelContext.channel().attr(ChannelAttributeKeys.SUBSCRIBER_KEY)
.set(subscriber);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,8 @@
"describeEnvironmentManagedActionHistory",
"describeEnvironmentManagedActions",
"describeInstancesHealth",
"describeEnvironmentHealth"
"describeEnvironmentHealth",
"describeConfigurationOptions"
],
"verifiedSimpleMethods" : ["createStorageLocation"]
}
Loading