From 21f8be09a36727e8ea0c3b56af5c9639b0f54069 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 26 Apr 2024 18:40:13 -0700 Subject: [PATCH 1/4] Fix regression in timeout handling. - Resolve regression where CSOT exception is exposed despite CSOT being disabled. - Correct premature decrease in connect timeout before connection initiation. - Refactor timeout management by encapsulating logic within TimeoutContext to enhance code cleanliness and encapsulation. JAVA-5439 --- .../com/mongodb/internal/TimeoutContext.java | 43 ++++++------ .../internal/connection/CommandMessage.java | 9 +-- .../internal/connection/SocketStream.java | 17 ++--- .../connection/SocketStreamHelper.java | 7 +- .../connection/netty/NettyStream.java | 7 +- .../mongodb/internal/TimeoutContextTest.java | 66 +++++++++++++++++-- .../connection/CommandMessageTest.java | 2 +- 7 files changed, 91 insertions(+), 60 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index ab03d22e467..4afe8e4c0c2 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -185,26 +185,20 @@ public long getMaxAwaitTimeMS() { return timeoutSettings.getMaxAwaitTimeMS(); } - public void runMaxTimeMSTimeout(final Runnable onInfinite, final LongConsumer onRemaining, - final Runnable onExpired) { + public void runMaxTimeMS(final LongConsumer onRemaining) { if (maxTimeSupplier != null) { - runWithFixedTimout(maxTimeSupplier.get(), onInfinite, onRemaining); - return; + onRemaining.accept(maxTimeSupplier.get()); } - if (timeout != null) { timeout.shortenBy(minRoundTripTimeMS, MILLISECONDS) - .run(MILLISECONDS, onInfinite, onRemaining, onExpired); + .run(MILLISECONDS, + () -> {}, + onRemaining, + () -> { + throw createMongoRoundTripTimeoutException(); + }); } else { - runWithFixedTimout(timeoutSettings.getMaxTimeMS(), onInfinite, onRemaining); - } - } - - private static void runWithFixedTimout(final long ms, final Runnable onInfinite, final LongConsumer onRemaining) { - if (ms == 0) { - onInfinite.run(); - } else { - onRemaining.accept(ms); + onRemaining.accept(timeoutSettings.getMaxTimeMS()); } } @@ -214,7 +208,10 @@ public void resetToDefaultMaxTime() { /** * The override will be provided as the remaining value in - * {@link #runMaxTimeMSTimeout}, where 0 will invoke the onExpired path + * {@link #runMaxTimeMS}, where 0 is ignored. + *

+ * NOTE: Suitable for static user-defined values only (i.e MaxAwaitTimeMS), + * not for running timeouts that adjust dynamically. */ public void setMaxTimeOverride(final long maxTimeMS) { this.maxTimeSupplier = () -> maxTimeMS; @@ -222,7 +219,7 @@ public void setMaxTimeOverride(final long maxTimeMS) { /** * The override will be provided as the remaining value in - * {@link #runMaxTimeMSTimeout}, where 0 will invoke the onExpired path + * {@link #runMaxTimeMS}, where 0 is ignored. */ public void setMaxTimeOverrideToMaxCommitTime() { this.maxTimeSupplier = () -> getMaxCommitTimeMS(); @@ -242,12 +239,12 @@ public long getWriteTimeoutMS() { return timeoutOrAlternative(0); } - public Timeout createConnectTimeoutMs() { - // null timeout treated as infinite will be later than the other - - return Timeout.earliest( - Timeout.expiresIn(getTimeoutSettings().getConnectTimeoutMS(), MILLISECONDS, ZERO_DURATION_MEANS_INFINITE), - Timeout.nullAsInfinite(timeout)); + public int getConnectTimeoutMs() { + final long connectTimeoutMS = getTimeoutSettings().getConnectTimeoutMS(); + return Math.toIntExact(Timeout.nullAsInfinite(timeout).call(MILLISECONDS, + () -> connectTimeoutMS, + (ms) -> connectTimeoutMS == 0 ? ms : Math.min(ms, connectTimeoutMS), + () -> throwMongoTimeoutException("The operation timeout has expired."))); } public void resetTimeout() { diff --git a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java index 7c4eda58a85..c282f8d580b 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java +++ b/driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java @@ -222,12 +222,9 @@ private List getExtraElements(final OperationContext operationConte List extraElements = new ArrayList<>(); if (!getSettings().isCryptd()) { - timeoutContext.runMaxTimeMSTimeout( - () -> {}, - (ms) -> extraElements.add(new BsonElement("maxTimeMS", new BsonInt64(ms))), - () -> { - throw TimeoutContext.createMongoRoundTripTimeoutException(); - }); + timeoutContext.runMaxTimeMS(maxTimeMS -> + extraElements.add(new BsonElement("maxTimeMS", new BsonInt64(maxTimeMS))) + ); } extraElements.add(new BsonElement("$db", new BsonString(new MongoNamespace(getCollectionName()).getDatabaseName()))); if (sessionContext.getClusterTime() != null) { diff --git a/driver-core/src/main/com/mongodb/internal/connection/SocketStream.java b/driver-core/src/main/com/mongodb/internal/connection/SocketStream.java index f0cf6b02dad..8f11c610a06 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SocketStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SocketStream.java @@ -46,7 +46,6 @@ import static com.mongodb.internal.connection.SocketStreamHelper.configureSocket; import static com.mongodb.internal.connection.SslHelper.configureSslSocket; import static com.mongodb.internal.thread.InterruptionUtil.translateInterruptedException; -import static java.util.concurrent.TimeUnit.MILLISECONDS; /** *

This class is not part of the public API and may be removed or changed at any time

@@ -122,10 +121,7 @@ private SSLSocket initializeSslSocketOverSocksProxy(final OperationContext opera SocksSocket socksProxy = new SocksSocket(settings.getProxySettings()); configureSocket(socksProxy, operationContext, settings); InetSocketAddress inetSocketAddress = toSocketAddress(serverHost, serverPort); - operationContext.getTimeoutContext().createConnectTimeoutMs().checkedRun(MILLISECONDS, - () -> socksProxy.connect(inetSocketAddress, 0), - (ms) -> socksProxy.connect(inetSocketAddress, Math.toIntExact(ms)), - () -> throwMongoTimeoutException("The operation timeout has expired.")); + socksProxy.connect(inetSocketAddress, operationContext.getTimeoutContext().getConnectTimeoutMs()); SSLSocket sslSocket = (SSLSocket) sslSocketFactory.createSocket(socksProxy, serverHost, serverPort, true); //Even though Socks proxy connection is already established, TLS handshake has not been performed yet. @@ -153,11 +149,8 @@ private Socket initializeSocketOverSocksProxy(final OperationContext operationCo */ SocksSocket socksProxy = new SocksSocket(createdSocket, settings.getProxySettings()); - InetSocketAddress inetSocketAddress = toSocketAddress(address.getHost(), address.getPort()); - operationContext.getTimeoutContext().createConnectTimeoutMs().checkedRun(MILLISECONDS, - () -> socksProxy.connect(inetSocketAddress, 0), - (ms) -> socksProxy.connect(inetSocketAddress, Math.toIntExact(ms)), - () -> throwMongoTimeoutException("The operation timeout has expired.")); + socksProxy.connect(toSocketAddress(address.getHost(), address.getPort()), + operationContext.getTimeoutContext().getConnectTimeoutMs()); return socksProxy; } @@ -185,9 +178,7 @@ public ByteBuf read(final int numBytes, final OperationContext operationContext) byte[] bytes = buffer.array(); while (totalBytesRead < buffer.limit()) { int readTimeoutMS = (int) operationContext.getTimeoutContext().getReadTimeoutMS(); - if (readTimeoutMS > 0) { - socket.setSoTimeout(readTimeoutMS); - } + socket.setSoTimeout(readTimeoutMS); int bytesRead = inputStream.read(bytes, totalBytesRead, buffer.limit() - totalBytesRead); if (bytesRead == -1) { throw new MongoSocketReadException("Prematurely reached end of stream", getAddress()); diff --git a/driver-core/src/main/com/mongodb/internal/connection/SocketStreamHelper.java b/driver-core/src/main/com/mongodb/internal/connection/SocketStreamHelper.java index 6868760234c..74098c4ede6 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SocketStreamHelper.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SocketStreamHelper.java @@ -27,9 +27,7 @@ import java.net.SocketException; import java.net.SocketOption; -import static com.mongodb.internal.TimeoutContext.throwMongoTimeoutException; import static com.mongodb.internal.connection.SslHelper.configureSslSocket; -import static java.util.concurrent.TimeUnit.MILLISECONDS; @SuppressWarnings({"unchecked", "rawtypes"}) final class SocketStreamHelper { @@ -75,10 +73,7 @@ static void initialize(final OperationContext operationContext, final Socket soc final SslSettings sslSettings) throws IOException { configureSocket(socket, operationContext, settings); configureSslSocket(socket, sslSettings, inetSocketAddress); - operationContext.getTimeoutContext().createConnectTimeoutMs().checkedRun(MILLISECONDS, - () -> socket.connect(inetSocketAddress, 0), - (ms) -> socket.connect(inetSocketAddress, Math.toIntExact(ms)), - () -> throwMongoTimeoutException("The operation timeout has expired.")); + socket.connect(inetSocketAddress, operationContext.getTimeoutContext().getConnectTimeoutMs()); } static void configureSocket(final Socket socket, final OperationContext operationContext, final SocketSettings settings) throws SocketException { diff --git a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java index adfc12c94c5..04989451596 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java +++ b/driver-core/src/main/com/mongodb/internal/connection/netty/NettyStream.java @@ -71,7 +71,6 @@ import static com.mongodb.assertions.Assertions.assertNotNull; import static com.mongodb.internal.Locks.withLock; -import static com.mongodb.internal.TimeoutContext.throwMongoTimeoutException; import static com.mongodb.internal.connection.ServerAddressHelper.getSocketAddresses; import static com.mongodb.internal.connection.SslHelper.enableHostNameVerification; import static com.mongodb.internal.connection.SslHelper.enableSni; @@ -192,10 +191,8 @@ private void initializeChannel(final OperationContext operationContext, final As Bootstrap bootstrap = new Bootstrap(); bootstrap.group(workerGroup); bootstrap.channel(socketChannelClass); - operationContext.getTimeoutContext().createConnectTimeoutMs().checkedRun(MILLISECONDS, - () -> bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, 0), - (ms) -> bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, Math.toIntExact(ms)), - () -> throwMongoTimeoutException("The operation timeout has expired.")); + bootstrap.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, + operationContext.getTimeoutContext().getConnectTimeoutMs()); bootstrap.option(ChannelOption.TCP_NODELAY, true); bootstrap.option(ChannelOption.SO_KEEPALIVE, true); diff --git a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java index 753f7227abb..3316f88a36f 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java @@ -19,9 +19,13 @@ import com.mongodb.session.ClientSession; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mockito; import java.util.function.Supplier; +import java.util.stream.Stream; import static com.mongodb.ClusterFixture.TIMEOUT_SETTINGS; import static com.mongodb.ClusterFixture.TIMEOUT_SETTINGS_WITH_INFINITE_TIMEOUT; @@ -42,12 +46,7 @@ final class TimeoutContextTest { public static long getMaxTimeMS(final TimeoutContext timeoutContext) { long[] result = {0L}; - timeoutContext.runMaxTimeMSTimeout( - () -> {}, - (ms) -> result[0] = ms, - () -> { - throw TimeoutContext.createMongoRoundTripTimeoutException(); - }); + timeoutContext.runMaxTimeMS((ms) -> result[0] = ms); return result[0]; } @@ -284,6 +283,61 @@ void shouldResetMaximeMS() { assertTrue(getMaxTimeMS(timeoutContext) > 1); } + static Stream shouldChooseConnectTimeoutWhenItIsLessThenTimeoutMs() { + return Stream.of( + //connectTimeoutMS, timeoutMS, expected + Arguments.of(500L, 1000L, 500L), + Arguments.of(0L, null, 0L), + Arguments.of(1000L, null, 1000L), + Arguments.of(1000L, 0L, 1000L), + Arguments.of(0L, 0L, 0L) + ); + } + + @ParameterizedTest + @MethodSource + @DisplayName("should choose connectTimeoutMS when connectTimeoutMS is less than timeoutMS") + void shouldChooseConnectTimeoutWhenItIsLessThenTimeoutMs(final Long connectTimeoutMS, + final Long timeoutMS, + final long expected) { + TimeoutContext timeoutContext = new TimeoutContext( + new TimeoutSettings(0, + connectTimeoutMS, + 0, + timeoutMS, + 0)); + + long calculatedTimeoutMS = timeoutContext.getConnectTimeoutMs(); + assertEquals(expected, calculatedTimeoutMS); + } + + + static Stream shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS() { + return Stream.of( + //connectTimeoutMS, timeoutMS, expected + Arguments.of(1000L, 1000L, 999), + Arguments.of(1000L, 500L, 499L), + Arguments.of(0L, 1000L, 999L) + ); + } + + @ParameterizedTest + @MethodSource + @DisplayName("should choose timeoutMS when timeoutMS is less than connectTimeoutMS") + void shouldChooseTimeoutMsWhenItIsLessThenConnectTimeoutMS(final Long connectTimeoutMS, + final Long timeoutMS, + final long expected) { + TimeoutContext timeoutContext = new TimeoutContext( + new TimeoutSettings(0, + connectTimeoutMS, + 0, + timeoutMS, + 0)); + + long calculatedTimeoutMS = timeoutContext.getConnectTimeoutMs(); + assertTrue(expected - calculatedTimeoutMS <= 1); + } + private TimeoutContextTest() { } } diff --git a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageTest.java b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageTest.java index 99c14211071..ca5b2d70d7e 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/connection/CommandMessageTest.java @@ -60,7 +60,7 @@ void encodeShouldThrowTimeoutExceptionWhenTimeoutContextIsCalled() { BasicOutputBuffer bsonOutput = new BasicOutputBuffer(); SessionContext sessionContext = mock(SessionContext.class); TimeoutContext timeoutContext = mock(TimeoutContext.class, mock -> { - doThrow(new MongoOperationTimeoutException("test")).when(mock).runMaxTimeMSTimeout(any(), any(), any()); + doThrow(new MongoOperationTimeoutException("test")).when(mock).runMaxTimeMS(any()); }); OperationContext operationContext = mock(OperationContext.class, mock -> { when(mock.getSessionContext()).thenReturn(sessionContext); From 269eeb8ca0daeec2ec4c916a71c9f8b97ea954a7 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 26 Apr 2024 18:44:15 -0700 Subject: [PATCH 2/4] Revert runWithFixedTimeout. JAVA-5439 --- .../src/main/com/mongodb/internal/TimeoutContext.java | 10 ++++++++-- 1 file changed, 8 insertions(+), 2 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index 4afe8e4c0c2..18a43030fca 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -187,7 +187,7 @@ public long getMaxAwaitTimeMS() { public void runMaxTimeMS(final LongConsumer onRemaining) { if (maxTimeSupplier != null) { - onRemaining.accept(maxTimeSupplier.get()); + runWithFixedTimout(maxTimeSupplier.get(), onRemaining); } if (timeout != null) { timeout.shortenBy(minRoundTripTimeMS, MILLISECONDS) @@ -198,7 +198,13 @@ public void runMaxTimeMS(final LongConsumer onRemaining) { throw createMongoRoundTripTimeoutException(); }); } else { - onRemaining.accept(timeoutSettings.getMaxTimeMS()); + runWithFixedTimout(timeoutSettings.getMaxTimeMS(), onRemaining); + } + } + + private static void runWithFixedTimout(final long ms, final LongConsumer onRemaining) { + if (ms != 0) { + onRemaining.accept(ms); } } From 22a2e6cc53a0cc0f2dbea2331add0ab0796c342b Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Fri, 26 Apr 2024 18:58:27 -0700 Subject: [PATCH 3/4] Add more tests. JAVA-5439 --- .../com/mongodb/internal/TimeoutContext.java | 20 ++++++++++--------- .../mongodb/internal/TimeoutContextTest.java | 3 +++ 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index 18a43030fca..17c4fdda922 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -188,18 +188,20 @@ public long getMaxAwaitTimeMS() { public void runMaxTimeMS(final LongConsumer onRemaining) { if (maxTimeSupplier != null) { runWithFixedTimout(maxTimeSupplier.get(), onRemaining); + return; } - if (timeout != null) { - timeout.shortenBy(minRoundTripTimeMS, MILLISECONDS) - .run(MILLISECONDS, - () -> {}, - onRemaining, - () -> { - throw createMongoRoundTripTimeoutException(); - }); - } else { + if (timeout == null) { runWithFixedTimout(timeoutSettings.getMaxTimeMS(), onRemaining); + return; } + timeout.shortenBy(minRoundTripTimeMS, MILLISECONDS) + .run(MILLISECONDS, + () -> {}, + onRemaining, + () -> { + throw createMongoRoundTripTimeoutException(); + }); + } private static void runWithFixedTimout(final long ms, final LongConsumer onRemaining) { diff --git a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java index 3316f88a36f..d8ba97d2f5e 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/TimeoutContextTest.java @@ -197,16 +197,19 @@ void testThrowsWhenExpired() { assertThrows(MongoOperationTimeoutException.class, smallTimeout::getReadTimeoutMS); assertThrows(MongoOperationTimeoutException.class, smallTimeout::getWriteTimeoutMS); + assertThrows(MongoOperationTimeoutException.class, smallTimeout::getConnectTimeoutMs); assertThrows(MongoOperationTimeoutException.class, () -> getMaxTimeMS(smallTimeout)); assertThrows(MongoOperationTimeoutException.class, smallTimeout::getMaxCommitTimeMS); assertThrows(MongoOperationTimeoutException.class, () -> smallTimeout.timeoutOrAlternative(1)); assertDoesNotThrow(longTimeout::getReadTimeoutMS); assertDoesNotThrow(longTimeout::getWriteTimeoutMS); + assertDoesNotThrow(longTimeout::getConnectTimeoutMs); assertDoesNotThrow(() -> getMaxTimeMS(longTimeout)); assertDoesNotThrow(longTimeout::getMaxCommitTimeMS); assertDoesNotThrow(() -> longTimeout.timeoutOrAlternative(1)); assertDoesNotThrow(noTimeout::getReadTimeoutMS); assertDoesNotThrow(noTimeout::getWriteTimeoutMS); + assertDoesNotThrow(noTimeout::getConnectTimeoutMs); assertDoesNotThrow(() -> getMaxTimeMS(noTimeout)); assertDoesNotThrow(noTimeout::getMaxCommitTimeMS); assertDoesNotThrow(() -> noTimeout.timeoutOrAlternative(1)); From 2650e8729eb238abfaa35a1bb11656ae1b779489 Mon Sep 17 00:00:00 2001 From: "slav.babanin" Date: Mon, 29 Apr 2024 08:48:14 -0700 Subject: [PATCH 4/4] Correct method naming. JAVA-5439 --- .../src/main/com/mongodb/internal/TimeoutContext.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java index 17c4fdda922..6a67ee52859 100644 --- a/driver-core/src/main/com/mongodb/internal/TimeoutContext.java +++ b/driver-core/src/main/com/mongodb/internal/TimeoutContext.java @@ -187,11 +187,11 @@ public long getMaxAwaitTimeMS() { public void runMaxTimeMS(final LongConsumer onRemaining) { if (maxTimeSupplier != null) { - runWithFixedTimout(maxTimeSupplier.get(), onRemaining); + runWithFixedTimeout(maxTimeSupplier.get(), onRemaining); return; } if (timeout == null) { - runWithFixedTimout(timeoutSettings.getMaxTimeMS(), onRemaining); + runWithFixedTimeout(timeoutSettings.getMaxTimeMS(), onRemaining); return; } timeout.shortenBy(minRoundTripTimeMS, MILLISECONDS) @@ -204,7 +204,7 @@ public void runMaxTimeMS(final LongConsumer onRemaining) { } - private static void runWithFixedTimout(final long ms, final LongConsumer onRemaining) { + private static void runWithFixedTimeout(final long ms, final LongConsumer onRemaining) { if (ms != 0) { onRemaining.accept(ms); }