Skip to content

Commit b40be6b

Browse files
authored
CSOT: Network failure handling and session management (#1396)
- Mark session as dirty when MongoSocketException occurs. - Skip killCursors command execution when getMore fails in LOAD_BALANCER mode. JAVA-5474 JAVA-5476
1 parent b1971b7 commit b40be6b

File tree

13 files changed

+655
-15
lines changed

13 files changed

+655
-15
lines changed

driver-core/src/main/com/mongodb/internal/ExceptionUtils.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,8 @@
1717
package com.mongodb.internal;
1818

1919
import com.mongodb.MongoCommandException;
20+
import com.mongodb.MongoOperationTimeoutException;
21+
import com.mongodb.MongoSocketException;
2022
import org.bson.BsonArray;
2123
import org.bson.BsonDocument;
2224
import org.bson.BsonInt32;
@@ -35,6 +37,15 @@
3537
* <p>This class is not part of the public API and may be removed or changed at any time</p>
3638
*/
3739
public final class ExceptionUtils {
40+
41+
public static boolean isMongoSocketException(final Throwable e) {
42+
return e instanceof MongoSocketException;
43+
}
44+
45+
public static boolean isOperationTimeoutFromSocketException(final Throwable e) {
46+
return e instanceof MongoOperationTimeoutException && e.getCause() instanceof MongoSocketException;
47+
}
48+
3849
public static final class MongoCommandExceptionUtils {
3950
public static int extractErrorCode(final BsonDocument response) {
4051
return extractErrorCodeAsBson(response).intValue();
Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2008-present MongoDB, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.mongodb.internal.connection;
18+
19+
import com.mongodb.internal.session.SessionContext;
20+
21+
import static com.mongodb.internal.ExceptionUtils.isMongoSocketException;
22+
import static com.mongodb.internal.ExceptionUtils.isOperationTimeoutFromSocketException;
23+
24+
/**
25+
* <p>This class is not part of the public API and may be removed or changed at any time</p>
26+
*/
27+
public abstract class AbstractProtocolExecutor implements ProtocolExecutor {
28+
29+
protected boolean shouldMarkSessionDirty(final Throwable e, final SessionContext sessionContext) {
30+
if (!sessionContext.hasSession()) {
31+
return false;
32+
}
33+
return isMongoSocketException(e) || isOperationTimeoutFromSocketException(e);
34+
}
35+
}

driver-core/src/main/com/mongodb/internal/connection/DefaultServer.java

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.mongodb.MongoException;
2020
import com.mongodb.MongoServerUnavailableException;
21-
import com.mongodb.MongoSocketException;
2221
import com.mongodb.ReadPreference;
2322
import com.mongodb.connection.ClusterConnectionMode;
2423
import com.mongodb.connection.ConnectionDescription;
@@ -197,7 +196,7 @@ ServerId serverId() {
197196
return serverId;
198197
}
199198

200-
private class DefaultServerProtocolExecutor implements ProtocolExecutor {
199+
private class DefaultServerProtocolExecutor extends AbstractProtocolExecutor {
201200

202201
@SuppressWarnings("unchecked")
203202
@Override
@@ -216,9 +215,9 @@ public <T> T execute(final CommandProtocol<T> protocol, final InternalConnection
216215
if (e instanceof MongoWriteConcernWithResponseException) {
217216
return (T) ((MongoWriteConcernWithResponseException) e).getResponse();
218217
} else {
219-
if (e instanceof MongoSocketException && sessionContext.hasSession()) {
218+
if (shouldMarkSessionDirty(e, sessionContext)) {
220219
sessionContext.markSessionDirty();
221-
}
220+
}
222221
throw e;
223222
}
224223
}
@@ -239,7 +238,7 @@ public <T> void executeAsync(final CommandProtocol<T> protocol, final InternalCo
239238
if (t instanceof MongoWriteConcernWithResponseException) {
240239
callback.onResult((T) ((MongoWriteConcernWithResponseException) t).getResponse(), null);
241240
} else {
242-
if (t instanceof MongoSocketException && sessionContext.hasSession()) {
241+
if (shouldMarkSessionDirty(t, sessionContext)) {
243242
sessionContext.markSessionDirty();
244243
}
245244
callback.onResult(null, t);

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import static com.mongodb.assertions.Assertions.isTrue;
7777
import static com.mongodb.assertions.Assertions.notNull;
7878
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
79+
import static com.mongodb.internal.TimeoutContext.createMongoTimeoutException;
7980
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
8081
import static com.mongodb.internal.connection.Authenticator.shouldAuthenticate;
8182
import static com.mongodb.internal.connection.CommandHelper.HELLO;
@@ -775,7 +776,7 @@ private void throwTranslatedWriteException(final Throwable e, final OperationCon
775776

776777
private MongoException translateWriteException(final Throwable e, final OperationContext operationContext) {
777778
if (e instanceof MongoSocketWriteTimeoutException && operationContext.getTimeoutContext().hasExpired()) {
778-
return TimeoutContext.createMongoTimeoutException(e);
779+
return createMongoTimeoutException(e);
779780
}
780781

781782
if (e instanceof MongoException) {
@@ -792,9 +793,12 @@ private MongoException translateWriteException(final Throwable e, final Operatio
792793
}
793794

794795
private MongoException translateReadException(final Throwable e, final OperationContext operationContext) {
795-
if (operationContext.getTimeoutContext().hasTimeoutMS()
796-
&& (e instanceof SocketTimeoutException || e instanceof MongoSocketReadTimeoutException)) {
797-
return TimeoutContext.createMongoTimeoutException(e);
796+
if (operationContext.getTimeoutContext().hasTimeoutMS()) {
797+
if (e instanceof SocketTimeoutException) {
798+
return createMongoTimeoutException(createReadTimeoutException((SocketTimeoutException) e));
799+
} else if (e instanceof MongoSocketReadTimeoutException) {
800+
return createMongoTimeoutException((e));
801+
}
798802
}
799803

800804
if (e instanceof MongoException) {
@@ -804,7 +808,7 @@ private MongoException translateReadException(final Throwable e, final Operation
804808
if (interruptedException.isPresent()) {
805809
return interruptedException.get();
806810
} else if (e instanceof SocketTimeoutException) {
807-
return new MongoSocketReadTimeoutException("Timeout while receiving message", getServerAddress(), e);
811+
return createReadTimeoutException((SocketTimeoutException) e);
808812
} else if (e instanceof IOException) {
809813
return new MongoSocketReadException("Exception receiving message", getServerAddress(), e);
810814
} else if (e instanceof RuntimeException) {
@@ -814,6 +818,11 @@ private MongoException translateReadException(final Throwable e, final Operation
814818
}
815819
}
816820

821+
private MongoSocketReadTimeoutException createReadTimeoutException(final SocketTimeoutException e) {
822+
return new MongoSocketReadTimeoutException("Timeout while receiving message",
823+
getServerAddress(), e);
824+
}
825+
817826
private ResponseBuffers receiveResponseBuffers(final OperationContext operationContext) {
818827
try {
819828
ByteBuf messageHeaderBuffer = stream.read(MESSAGE_HEADER_LENGTH, operationContext);

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
* <p>This class is not part of the public API and may be removed or changed at any time</p>
5151
*/
5252
public class InternalStreamConnectionInitializer implements InternalConnectionInitializer {
53+
private static final int INITIAL_MIN_RTT = 0;
5354
private final ClusterConnectionMode clusterConnectionMode;
5455
private final Authenticator authenticator;
5556
private final BsonDocument clientMetadataDocument;
@@ -160,7 +161,7 @@ private InternalConnectionInitializationDescription createInitializationDescript
160161
helloResult);
161162
ServerDescription serverDescription =
162163
createServerDescription(internalConnection.getDescription().getServerAddress(), helloResult,
163-
System.nanoTime() - startTime, 0);
164+
System.nanoTime() - startTime, INITIAL_MIN_RTT);
164165
return new InternalConnectionInitializationDescription(connectionDescription, serverDescription);
165166
}
166167

driver-core/src/main/com/mongodb/internal/connection/LoadBalancedServer.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,7 +154,7 @@ ConnectionPool getConnectionPool() {
154154
return connectionPool;
155155
}
156156

157-
private class LoadBalancedServerProtocolExecutor implements ProtocolExecutor {
157+
private class LoadBalancedServerProtocolExecutor extends AbstractProtocolExecutor {
158158
@SuppressWarnings("unchecked")
159159
@Override
160160
public <T> T execute(final CommandProtocol<T> protocol, final InternalConnection connection, final SessionContext sessionContext) {
@@ -191,7 +191,7 @@ public <T> void executeAsync(final CommandProtocol<T> protocol, final InternalCo
191191
private void handleExecutionException(final InternalConnection connection, final SessionContext sessionContext,
192192
final Throwable t) {
193193
invalidate(t, connection.getDescription().getServiceId(), connection.getGeneration());
194-
if (t instanceof MongoSocketException && sessionContext.hasSession()) {
194+
if (shouldMarkSessionDirty(t, sessionContext)) {
195195
sessionContext.markSessionDirty();
196196
}
197197
}

driver-core/src/main/com/mongodb/internal/operation/AsyncCommandBatchCursor.java

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import com.mongodb.MongoCommandException;
2020
import com.mongodb.MongoNamespace;
21+
import com.mongodb.MongoOperationTimeoutException;
2122
import com.mongodb.MongoSocketException;
2223
import com.mongodb.ReadPreference;
2324
import com.mongodb.ServerAddress;
@@ -286,15 +287,23 @@ <R> void executeWithConnection(final AsyncCallableConnectionWithCallback<R> call
286287
return;
287288
}
288289
callable.call(assertNotNull(connection), (result, t1) -> {
289-
if (t1 instanceof MongoSocketException) {
290-
onCorruptedConnection(connection, (MongoSocketException) t1);
290+
if (t1 != null) {
291+
handleException(connection, t1);
291292
}
292293
connection.release();
293294
callback.onResult(result, t1);
294295
});
295296
});
296297
}
297298

299+
private void handleException(final AsyncConnection connection, final Throwable exception) {
300+
if (exception instanceof MongoOperationTimeoutException && exception.getCause() instanceof MongoSocketException) {
301+
onCorruptedConnection(connection, (MongoSocketException) exception.getCause());
302+
} else if (exception instanceof MongoSocketException) {
303+
onCorruptedConnection(connection, (MongoSocketException) exception);
304+
}
305+
}
306+
298307
private void getConnection(final SingleResultCallback<AsyncConnection> callback) {
299308
assertTrue(getState() != State.IDLE);
300309
AsyncConnection pinnedConnection = getPinnedConnection();

driver-core/src/main/com/mongodb/internal/operation/CommandBatchCursor.java

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.mongodb.MongoCommandException;
2020
import com.mongodb.MongoException;
2121
import com.mongodb.MongoNamespace;
22+
import com.mongodb.MongoOperationTimeoutException;
2223
import com.mongodb.MongoSocketException;
2324
import com.mongodb.ReadPreference;
2425
import com.mongodb.ServerAddress;
@@ -334,6 +335,12 @@ void executeWithConnection(final Consumer<Connection> action) {
334335
} catch (MongoSocketException e) {
335336
onCorruptedConnection(connection, e);
336337
throw e;
338+
} catch (MongoOperationTimeoutException e) {
339+
Throwable cause = e.getCause();
340+
if (cause instanceof MongoSocketException) {
341+
onCorruptedConnection(connection, (MongoSocketException) cause);
342+
}
343+
throw e;
337344
} finally {
338345
connection.release();
339346
}

0 commit comments

Comments
 (0)