Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 23 additions & 16 deletions okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java
Original file line number Diff line number Diff line change
Expand Up @@ -92,10 +92,10 @@ final class OkHttpServerTransport implements ServerTransport,
private static final ByteString CONTENT_LENGTH = ByteString.encodeUtf8("content-length");

private final Config config;
private final Socket bareSocket;
private final Variant variant = new Http2();
private final TransportTracer tracer;
private final InternalLogId logId;
private Socket socket;
private ServerTransportListener listener;
private Executor transportExecutor;
private ScheduledExecutorService scheduledExecutorService;
Expand Down Expand Up @@ -141,11 +141,11 @@ final class OkHttpServerTransport implements ServerTransport,

public OkHttpServerTransport(Config config, Socket bareSocket) {
this.config = Preconditions.checkNotNull(config, "config");
this.bareSocket = Preconditions.checkNotNull(bareSocket, "bareSocket");
this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket");

tracer = config.transportTracerFactory.create();
tracer.setFlowControlWindowReader(this::readFlowControlWindow);
logId = InternalLogId.allocate(getClass(), bareSocket.getRemoteSocketAddress().toString());
logId = InternalLogId.allocate(getClass(), socket.getRemoteSocketAddress().toString());
transportExecutor = config.transportExecutorPool.getObject();
scheduledExecutorService = config.scheduledExecutorServicePool.getObject();
keepAliveEnforcer = new KeepAliveEnforcer(config.permitKeepAliveWithoutCalls,
Expand All @@ -161,10 +161,17 @@ public void start(ServerTransportListener listener) {

private void startIo(SerializingExecutor serializingExecutor) {
try {
bareSocket.setTcpNoDelay(true);
// The socket implementation is lazily initialized, but had broken thread-safety
// for that laziness https://bugs.openjdk.org/browse/JDK-8278326.
// As a workaround, we lock to synchronize initialization with shutdown().
synchronized (lock) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this synchronized? If we do, we're going to need a comment as to why (probably a link to the Java bug).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes--and just added the comment.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's explain a bit more.

The socket implementation is lazily initialized, but had broken thread-safety for that laziness https://bugs.openjdk.org/browse/JDK-8278326 . As a workaround, we lock to synchronize initialization with shutdown().

socket.setTcpNoDelay(true);
}
HandshakerSocketFactory.HandshakeResult result =
config.handshakerSocketFactory.handshake(bareSocket, Attributes.EMPTY);
Socket socket = result.socket;
config.handshakerSocketFactory.handshake(socket, Attributes.EMPTY);
synchronized (lock) {
this.socket = result.socket;
}
this.attributes = result.attributes;

int maxQueuedControlFrames = 10000;
Expand Down Expand Up @@ -249,7 +256,7 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount
log.log(Level.INFO, "Socket failed to handshake", ex);
}
}
GrpcUtil.closeQuietly(bareSocket);
GrpcUtil.closeQuietly(socket);
terminated();
}
}
Expand All @@ -268,7 +275,7 @@ private void shutdown(@Nullable Long gracefulShutdownPeriod) {
this.gracefulShutdownPeriod = gracefulShutdownPeriod;
if (frameWriter == null) {
handshakeShutdown = true;
GrpcUtil.closeQuietly(bareSocket);
GrpcUtil.closeQuietly(socket);
} else {
// RFC7540 §6.8. Begin double-GOAWAY graceful shutdown. To wait one RTT we use a PING, but
// we also set a timer to limit the upper bound in case the PING is excessively stalled or
Expand Down Expand Up @@ -309,7 +316,7 @@ public void shutdownNow(Status reason) {
synchronized (lock) {
if (frameWriter == null) {
handshakeShutdown = true;
GrpcUtil.closeQuietly(bareSocket);
GrpcUtil.closeQuietly(socket);
return;
}
}
Expand Down Expand Up @@ -360,7 +367,7 @@ private void abruptShutdown(

private void triggerForcefulClose() {
// Safe to do unconditionally; no need to check if timer cancellation raced
GrpcUtil.closeQuietly(bareSocket);
GrpcUtil.closeQuietly(socket);
}

private void terminated() {
Expand Down Expand Up @@ -396,9 +403,9 @@ public ListenableFuture<InternalChannelz.SocketStats> getStats() {
synchronized (lock) {
return Futures.immediateFuture(new InternalChannelz.SocketStats(
tracer.getStats(),
bareSocket.getLocalSocketAddress(),
bareSocket.getRemoteSocketAddress(),
Utils.getSocketOptions(bareSocket),
socket.getLocalSocketAddress(),
socket.getRemoteSocketAddress(),
Utils.getSocketOptions(socket),
securityInfo));
}
}
Expand Down Expand Up @@ -593,12 +600,12 @@ public void run() {
} finally {
// Wait for the abrupt shutdown to be processed by AsyncSink and close the socket
try {
GrpcUtil.exhaust(bareSocket.getInputStream());
GrpcUtil.exhaust(socket.getInputStream());
} catch (IOException ex) {
// Unable to wait, so just proceed to tear-down. The socket is probably already closed so
// the GOAWAY can't be sent anyway.
}
GrpcUtil.closeQuietly(bareSocket);
GrpcUtil.closeQuietly(socket);
terminated();
Thread.currentThread().setName(threadName);
}
Expand Down Expand Up @@ -1108,7 +1115,7 @@ public void onPingTimeout() {
synchronized (lock) {
goAwayStatus = Status.UNAVAILABLE
.withDescription("Keepalive failed. Considering connection dead");
GrpcUtil.closeQuietly(bareSocket);
GrpcUtil.closeQuietly(socket);
}
}
}
Expand Down