Skip to content

Commit dfd6e7c

Browse files
authored
Do not perform server selection to determine sessions support (#1092)
Instead, check for session support during operation execution after the connection is checked out.
1 parent e9a4bd8 commit dfd6e7c

File tree

21 files changed

+281
-249
lines changed

21 files changed

+281
-249
lines changed

driver-core/src/main/com/mongodb/connection/ConnectionDescription.java

Lines changed: 51 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@ public class ConnectionDescription {
4646
private final int maxMessageSize;
4747
private final List<String> compressors;
4848
private final BsonArray saslSupportedMechanisms;
49+
private final Integer logicalSessionTimeoutMinutes;
4950

5051
private static final int DEFAULT_MAX_MESSAGE_SIZE = 0x2000000; // 32MB
5152
private static final int DEFAULT_MAX_WRITE_BATCH_SIZE = 512;
@@ -99,6 +100,29 @@ public ConnectionDescription(final ConnectionId connectionId, final int maxWireV
99100
saslSupportedMechanisms);
100101
}
101102

103+
/**
104+
* Construct an instance.
105+
*
106+
* @param connectionId the connection id
107+
* @param maxWireVersion the max wire version
108+
* @param serverType the server type
109+
* @param maxBatchCount the max batch count
110+
* @param maxDocumentSize the max document size in bytes
111+
* @param maxMessageSize the max message size in bytes
112+
* @param compressors the available compressors on the connection
113+
* @param saslSupportedMechanisms the supported SASL mechanisms
114+
* @param logicalSessionTimeoutMinutes the logical session timeout, in minutes
115+
* @since 4.10
116+
*/
117+
public ConnectionDescription(final ConnectionId connectionId, final int maxWireVersion,
118+
final ServerType serverType, final int maxBatchCount, final int maxDocumentSize,
119+
final int maxMessageSize, final List<String> compressors,
120+
@Nullable final BsonArray saslSupportedMechanisms,
121+
@Nullable final Integer logicalSessionTimeoutMinutes) {
122+
this(null, connectionId, maxWireVersion, serverType, maxBatchCount, maxDocumentSize, maxMessageSize, compressors,
123+
saslSupportedMechanisms, logicalSessionTimeoutMinutes);
124+
}
125+
102126
/**
103127
* Construct an instance.
104128
*
@@ -117,6 +141,14 @@ public ConnectionDescription(@Nullable final ObjectId serviceId, final Connectio
117141
final ServerType serverType, final int maxBatchCount, final int maxDocumentSize,
118142
final int maxMessageSize, final List<String> compressors,
119143
@Nullable final BsonArray saslSupportedMechanisms) {
144+
this(serviceId, connectionId, maxWireVersion, serverType, maxBatchCount, maxDocumentSize, maxMessageSize, compressors,
145+
saslSupportedMechanisms, null);
146+
}
147+
148+
private ConnectionDescription(@Nullable final ObjectId serviceId, final ConnectionId connectionId, final int maxWireVersion,
149+
final ServerType serverType, final int maxBatchCount, final int maxDocumentSize,
150+
final int maxMessageSize, final List<String> compressors,
151+
@Nullable final BsonArray saslSupportedMechanisms, @Nullable final Integer logicalSessionTimeoutMinutes) {
120152
this.serviceId = serviceId;
121153
this.connectionId = connectionId;
122154
this.serverType = serverType;
@@ -126,6 +158,7 @@ public ConnectionDescription(@Nullable final ObjectId serviceId, final Connectio
126158
this.maxWireVersion = maxWireVersion;
127159
this.compressors = notNull("compressors", Collections.unmodifiableList(new ArrayList<>(compressors)));
128160
this.saslSupportedMechanisms = saslSupportedMechanisms;
161+
this.logicalSessionTimeoutMinutes = logicalSessionTimeoutMinutes;
129162
}
130163
/**
131164
* Creates a new connection description with the set connection id
@@ -137,7 +170,7 @@ public ConnectionDescription(@Nullable final ObjectId serviceId, final Connectio
137170
public ConnectionDescription withConnectionId(final ConnectionId connectionId) {
138171
notNull("connectionId", connectionId);
139172
return new ConnectionDescription(serviceId, connectionId, maxWireVersion, serverType, maxBatchCount, maxDocumentSize,
140-
maxMessageSize, compressors, saslSupportedMechanisms);
173+
maxMessageSize, compressors, saslSupportedMechanisms, logicalSessionTimeoutMinutes);
141174
}
142175

143176
/**
@@ -150,7 +183,7 @@ public ConnectionDescription withConnectionId(final ConnectionId connectionId) {
150183
public ConnectionDescription withServiceId(final ObjectId serviceId) {
151184
notNull("serviceId", serviceId);
152185
return new ConnectionDescription(serviceId, connectionId, maxWireVersion, serverType, maxBatchCount, maxDocumentSize,
153-
maxMessageSize, compressors, saslSupportedMechanisms);
186+
maxMessageSize, compressors, saslSupportedMechanisms, logicalSessionTimeoutMinutes);
154187
}
155188

156189
/**
@@ -248,6 +281,17 @@ public BsonArray getSaslSupportedMechanisms() {
248281
return saslSupportedMechanisms;
249282
}
250283

284+
/**
285+
* Gets the session timeout in minutes.
286+
*
287+
* @return the session timeout in minutes, or null if sessions are not supported by this connection
288+
* @mongodb.server.release 3.6
289+
* @since 4.10
290+
*/
291+
@Nullable
292+
public Integer getLogicalSessionTimeoutMinutes() {
293+
return logicalSessionTimeoutMinutes;
294+
}
251295
/**
252296
* Get the default maximum message size.
253297
*
@@ -302,6 +346,9 @@ public boolean equals(final Object o) {
302346
if (!compressors.equals(that.compressors)) {
303347
return false;
304348
}
349+
if (!Objects.equals(logicalSessionTimeoutMinutes, that.logicalSessionTimeoutMinutes)) {
350+
return false;
351+
}
305352
return Objects.equals(saslSupportedMechanisms, that.saslSupportedMechanisms);
306353
}
307354

@@ -316,6 +363,7 @@ public int hashCode() {
316363
result = 31 * result + compressors.hashCode();
317364
result = 31 * result + (serviceId != null ? serviceId.hashCode() : 0);
318365
result = 31 * result + (saslSupportedMechanisms != null ? saslSupportedMechanisms.hashCode() : 0);
366+
result = 31 * result + (logicalSessionTimeoutMinutes != null ? logicalSessionTimeoutMinutes.hashCode() : 0);
319367
return result;
320368
}
321369

@@ -329,6 +377,7 @@ public String toString() {
329377
+ ", maxDocumentSize=" + maxDocumentSize
330378
+ ", maxMessageSize=" + maxMessageSize
331379
+ ", compressors=" + compressors
380+
+ ", logicialSessionTimeoutMinutes=" + logicalSessionTimeoutMinutes
332381
+ ", serviceId=" + serviceId
333382
+ '}';
334383
}

driver-core/src/main/com/mongodb/internal/connection/CommandMessage.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -221,8 +221,14 @@ private List<BsonElement> getExtraElements(final SessionContext sessionContext)
221221
if (sessionContext.getClusterTime() != null) {
222222
extraElements.add(new BsonElement("$clusterTime", sessionContext.getClusterTime()));
223223
}
224-
if (sessionContext.hasSession() && responseExpected) {
225-
extraElements.add(new BsonElement("lsid", sessionContext.getSessionId()));
224+
if (sessionContext.hasSession()) {
225+
if (!sessionContext.isImplicitSession() && !getSettings().isSessionSupported()) {
226+
throw new MongoClientException("Attempting to use a ClientSession while connected to a server that doesn't support "
227+
+ "sessions");
228+
}
229+
if (getSettings().isSessionSupported() && responseExpected) {
230+
extraElements.add(new BsonElement("lsid", sessionContext.getSessionId()));
231+
}
226232
}
227233
boolean firstMessageInTransaction = sessionContext.notifyMessageSent();
228234

driver-core/src/main/com/mongodb/internal/connection/DescriptionHelper.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ static ConnectionDescription createConnectionDescription(final ClusterConnection
6868
ConnectionDescription connectionDescription = new ConnectionDescription(connectionId,
6969
getMaxWireVersion(helloResult), getServerType(helloResult), getMaxWriteBatchSize(helloResult),
7070
getMaxBsonObjectSize(helloResult), getMaxMessageSizeBytes(helloResult), getCompressors(helloResult),
71-
helloResult.getArray("saslSupportedMechs", null));
71+
helloResult.getArray("saslSupportedMechs", null), getLogicalSessionTimeoutMinutes(helloResult));
7272
if (helloResult.containsKey("connectionId")) {
7373
ConnectionId newConnectionId =
7474
connectionDescription.getConnectionId().withServerValue(helloResult.getNumber("connectionId").intValue());

driver-core/src/main/com/mongodb/internal/connection/MessageSettings.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@ public final class MessageSettings {
3636
private final int maxBatchCount;
3737
private final int maxWireVersion;
3838
private final ServerType serverType;
39+
private final boolean sessionSupported;
3940

4041
/**
4142
* Gets the builder
@@ -56,6 +57,7 @@ public static final class Builder {
5657
private int maxBatchCount = DEFAULT_MAX_BATCH_COUNT;
5758
private int maxWireVersion;
5859
private ServerType serverType;
60+
private boolean sessionSupported;
5961

6062
/**
6163
* Build it.
@@ -108,6 +110,11 @@ public Builder serverType(final ServerType serverType) {
108110
this.serverType = serverType;
109111
return this;
110112
}
113+
114+
public Builder sessionSupported(final boolean sessionSupported) {
115+
this.sessionSupported = sessionSupported;
116+
return this;
117+
}
111118
}
112119

113120
/**
@@ -145,11 +152,17 @@ public ServerType getServerType() {
145152
return serverType;
146153
}
147154

155+
public boolean isSessionSupported() {
156+
return sessionSupported;
157+
}
158+
159+
148160
private MessageSettings(final Builder builder) {
149161
this.maxDocumentSize = builder.maxDocumentSize;
150162
this.maxMessageSize = builder.maxMessageSize;
151163
this.maxBatchCount = builder.maxBatchCount;
152164
this.maxWireVersion = builder.maxWireVersion;
153165
this.serverType = builder.serverType;
166+
this.sessionSupported = builder.sessionSupported;
154167
}
155168
}

driver-core/src/main/com/mongodb/internal/connection/ProtocolHelper.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -228,6 +228,7 @@ static MessageSettings getMessageSettings(final ConnectionDescription connection
228228
.maxBatchCount(connectionDescription.getMaxBatchCount())
229229
.maxWireVersion(connectionDescription.getMaxWireVersion())
230230
.serverType(connectionDescription.getServerType())
231+
.sessionSupported(connectionDescription.getLogicalSessionTimeoutMinutes() != null)
231232
.build();
232233
}
233234

driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -224,7 +224,7 @@ private CommandCreator getCommandCreator(final SessionContext sessionContext) {
224224
putIfNotNull(commandDocument, "comment", getComment());
225225
putIfNotNull(commandDocument, "let", getLet());
226226

227-
if (isRetryableWrite(isRetryWrites(), getWriteConcern(), serverDescription, connectionDescription, sessionContext)) {
227+
if (isRetryableWrite(isRetryWrites(), getWriteConcern(), connectionDescription, sessionContext)) {
228228
commandDocument.put("txnNumber", new BsonInt64(sessionContext.advanceTransactionNumber()));
229229
}
230230
return commandDocument;

driver-core/src/main/com/mongodb/internal/operation/BulkWriteBatch.java

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@
2727
import com.mongodb.bulk.BulkWriteUpsert;
2828
import com.mongodb.bulk.WriteConcernError;
2929
import com.mongodb.connection.ConnectionDescription;
30-
import com.mongodb.connection.ServerDescription;
3130
import com.mongodb.internal.bulk.DeleteRequest;
3231
import com.mongodb.internal.bulk.UpdateRequest;
3332
import com.mongodb.internal.bulk.WriteRequest;
@@ -96,8 +95,7 @@ public final class BulkWriteBatch {
9695
private final BsonDocument variables;
9796

9897
static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
99-
final ServerDescription serverDescription,
100-
final ConnectionDescription connectionDescription,
98+
final ConnectionDescription connectionDescription,
10199
final boolean ordered, final WriteConcern writeConcern,
102100
final Boolean bypassDocumentValidation, final boolean retryWrites,
103101
final List<? extends WriteRequest> writeRequests,
@@ -107,7 +105,7 @@ static BulkWriteBatch createBulkWriteBatch(final MongoNamespace namespace,
107105
&& !writeConcern.isAcknowledged()) {
108106
throw new MongoClientException("Unacknowledged writes are not supported when using an explicit session");
109107
}
110-
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, serverDescription, connectionDescription, sessionContext);
108+
boolean canRetryWrites = isRetryableWrite(retryWrites, writeConcern, connectionDescription, sessionContext);
111109
List<WriteRequestWithIndex> writeRequestsWithIndex = new ArrayList<>();
112110
boolean writeRequestsAreRetryable = true;
113111
for (int i = 0; i < writeRequests.size(); i++) {

driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -407,8 +407,7 @@ static <T, R> R executeRetryableWrite(
407407
return withSourceAndConnection(binding::getWriteConnectionSource, true, (source, connection) -> {
408408
int maxWireVersion = connection.getDescription().getMaxWireVersion();
409409
try {
410-
retryState.breakAndThrowIfRetryAnd(() -> !canRetryWrite(source.getServerDescription(), connection.getDescription(),
411-
binding.getSessionContext()));
410+
retryState.breakAndThrowIfRetryAnd(() -> !canRetryWrite(connection.getDescription(), binding.getSessionContext()));
412411
BsonDocument command = retryState.attachment(AttachmentKeys.command())
413412
.map(previousAttemptCommand -> {
414413
assertFalse(firstAttempt);
@@ -462,8 +461,8 @@ static <T, R> void executeRetryableWriteAsync(
462461
SingleResultCallback<R> addingRetryableLabelCallback = firstAttempt
463462
? releasingCallback
464463
: addingRetryableLabelCallback(releasingCallback, maxWireVersion);
465-
if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryWrite(source.getServerDescription(), connection.getDescription(),
466-
binding.getSessionContext()), addingRetryableLabelCallback)) {
464+
if (retryState.breakAndCompleteIfRetryAnd(() -> !canRetryWrite(connection.getDescription(), binding.getSessionContext()),
465+
addingRetryableLabelCallback)) {
467466
return;
468467
}
469468
BsonDocument command;

driver-core/src/main/com/mongodb/internal/operation/MixedBulkWriteOperation.java

Lines changed: 4 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -184,14 +184,13 @@ public BulkWriteResult execute(final WriteBinding binding) {
184184
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
185185
SessionContext sessionContext = binding.getSessionContext();
186186
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
187-
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext),
188-
source.getServerDescription(), connectionDescription, sessionContext)) {
187+
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)) {
189188
handleMongoWriteConcernWithResponseException(retryState, true);
190189
}
191190
validateWriteRequests(connectionDescription, bypassDocumentValidation, writeRequests, writeConcern);
192191
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
193192
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
194-
source.getServerDescription(), connectionDescription, ordered, writeConcern,
193+
connectionDescription, ordered, writeConcern,
195194
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables));
196195
}
197196
logRetryExecute(retryState);
@@ -220,9 +219,7 @@ public void executeAsync(final AsyncWriteBinding binding, final SingleResultCall
220219
retryState.attach(AttachmentKeys.maxWireVersion(), connectionDescription.getMaxWireVersion(), true);
221220
SessionContext sessionContext = binding.getSessionContext();
222221
WriteConcern writeConcern = getAppliedWriteConcern(sessionContext);
223-
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext),
224-
source.getServerDescription(),
225-
connectionDescription, sessionContext)
222+
if (!isRetryableWrite(retryWrites, getAppliedWriteConcern(sessionContext), connectionDescription, sessionContext)
226223
&& handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallback)) {
227224
return;
228225
}
@@ -233,7 +230,7 @@ && handleMongoWriteConcernWithResponseExceptionAsync(retryState, releasingCallba
233230
try {
234231
if (!retryState.attachment(AttachmentKeys.bulkWriteTracker()).orElseThrow(Assertions::fail).batch().isPresent()) {
235232
BulkWriteTracker.attachNew(retryState, BulkWriteBatch.createBulkWriteBatch(namespace,
236-
source.getServerDescription(), connectionDescription, ordered, writeConcern,
233+
connectionDescription, ordered, writeConcern,
237234
bypassDocumentValidation, retryWrites, writeRequests, sessionContext, comment, variables));
238235
}
239236
} catch (Throwable t) {

0 commit comments

Comments
 (0)