Skip to content

Commit 2effae6

Browse files
okhttp: tsan socket data race bug fix (#10279)
replaced use of bareSocket with a synchronized socket, added additional lock to synchronize initialization with shutdown() to fix a Java bug
1 parent f1de820 commit 2effae6

File tree

1 file changed

+23
-16
lines changed

1 file changed

+23
-16
lines changed

okhttp/src/main/java/io/grpc/okhttp/OkHttpServerTransport.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -92,10 +92,10 @@ final class OkHttpServerTransport implements ServerTransport,
9292
private static final ByteString CONTENT_LENGTH = ByteString.encodeUtf8("content-length");
9393

9494
private final Config config;
95-
private final Socket bareSocket;
9695
private final Variant variant = new Http2();
9796
private final TransportTracer tracer;
9897
private final InternalLogId logId;
98+
private Socket socket;
9999
private ServerTransportListener listener;
100100
private Executor transportExecutor;
101101
private ScheduledExecutorService scheduledExecutorService;
@@ -141,11 +141,11 @@ final class OkHttpServerTransport implements ServerTransport,
141141

142142
public OkHttpServerTransport(Config config, Socket bareSocket) {
143143
this.config = Preconditions.checkNotNull(config, "config");
144-
this.bareSocket = Preconditions.checkNotNull(bareSocket, "bareSocket");
144+
this.socket = Preconditions.checkNotNull(bareSocket, "bareSocket");
145145

146146
tracer = config.transportTracerFactory.create();
147147
tracer.setFlowControlWindowReader(this::readFlowControlWindow);
148-
logId = InternalLogId.allocate(getClass(), bareSocket.getRemoteSocketAddress().toString());
148+
logId = InternalLogId.allocate(getClass(), socket.getRemoteSocketAddress().toString());
149149
transportExecutor = config.transportExecutorPool.getObject();
150150
scheduledExecutorService = config.scheduledExecutorServicePool.getObject();
151151
keepAliveEnforcer = new KeepAliveEnforcer(config.permitKeepAliveWithoutCalls,
@@ -161,10 +161,17 @@ public void start(ServerTransportListener listener) {
161161

162162
private void startIo(SerializingExecutor serializingExecutor) {
163163
try {
164-
bareSocket.setTcpNoDelay(true);
164+
// The socket implementation is lazily initialized, but had broken thread-safety
165+
// for that laziness https://bugs.openjdk.org/browse/JDK-8278326.
166+
// As a workaround, we lock to synchronize initialization with shutdown().
167+
synchronized (lock) {
168+
socket.setTcpNoDelay(true);
169+
}
165170
HandshakerSocketFactory.HandshakeResult result =
166-
config.handshakerSocketFactory.handshake(bareSocket, Attributes.EMPTY);
167-
Socket socket = result.socket;
171+
config.handshakerSocketFactory.handshake(socket, Attributes.EMPTY);
172+
synchronized (lock) {
173+
this.socket = result.socket;
174+
}
168175
this.attributes = result.attributes;
169176

170177
int maxQueuedControlFrames = 10000;
@@ -249,7 +256,7 @@ public void data(boolean outFinished, int streamId, Buffer source, int byteCount
249256
log.log(Level.INFO, "Socket failed to handshake", ex);
250257
}
251258
}
252-
GrpcUtil.closeQuietly(bareSocket);
259+
GrpcUtil.closeQuietly(socket);
253260
terminated();
254261
}
255262
}
@@ -268,7 +275,7 @@ private void shutdown(@Nullable Long gracefulShutdownPeriod) {
268275
this.gracefulShutdownPeriod = gracefulShutdownPeriod;
269276
if (frameWriter == null) {
270277
handshakeShutdown = true;
271-
GrpcUtil.closeQuietly(bareSocket);
278+
GrpcUtil.closeQuietly(socket);
272279
} else {
273280
// RFC7540 §6.8. Begin double-GOAWAY graceful shutdown. To wait one RTT we use a PING, but
274281
// we also set a timer to limit the upper bound in case the PING is excessively stalled or
@@ -309,7 +316,7 @@ public void shutdownNow(Status reason) {
309316
synchronized (lock) {
310317
if (frameWriter == null) {
311318
handshakeShutdown = true;
312-
GrpcUtil.closeQuietly(bareSocket);
319+
GrpcUtil.closeQuietly(socket);
313320
return;
314321
}
315322
}
@@ -360,7 +367,7 @@ private void abruptShutdown(
360367

361368
private void triggerForcefulClose() {
362369
// Safe to do unconditionally; no need to check if timer cancellation raced
363-
GrpcUtil.closeQuietly(bareSocket);
370+
GrpcUtil.closeQuietly(socket);
364371
}
365372

366373
private void terminated() {
@@ -396,9 +403,9 @@ public ListenableFuture<InternalChannelz.SocketStats> getStats() {
396403
synchronized (lock) {
397404
return Futures.immediateFuture(new InternalChannelz.SocketStats(
398405
tracer.getStats(),
399-
bareSocket.getLocalSocketAddress(),
400-
bareSocket.getRemoteSocketAddress(),
401-
Utils.getSocketOptions(bareSocket),
406+
socket.getLocalSocketAddress(),
407+
socket.getRemoteSocketAddress(),
408+
Utils.getSocketOptions(socket),
402409
securityInfo));
403410
}
404411
}
@@ -593,12 +600,12 @@ public void run() {
593600
} finally {
594601
// Wait for the abrupt shutdown to be processed by AsyncSink and close the socket
595602
try {
596-
GrpcUtil.exhaust(bareSocket.getInputStream());
603+
GrpcUtil.exhaust(socket.getInputStream());
597604
} catch (IOException ex) {
598605
// Unable to wait, so just proceed to tear-down. The socket is probably already closed so
599606
// the GOAWAY can't be sent anyway.
600607
}
601-
GrpcUtil.closeQuietly(bareSocket);
608+
GrpcUtil.closeQuietly(socket);
602609
terminated();
603610
Thread.currentThread().setName(threadName);
604611
}
@@ -1108,7 +1115,7 @@ public void onPingTimeout() {
11081115
synchronized (lock) {
11091116
goAwayStatus = Status.UNAVAILABLE
11101117
.withDescription("Keepalive failed. Considering connection dead");
1111-
GrpcUtil.closeQuietly(bareSocket);
1118+
GrpcUtil.closeQuietly(socket);
11121119
}
11131120
}
11141121
}

0 commit comments

Comments
 (0)