diff --git a/build.gradle b/build.gradle
index 10add6b07ab..6a5dd7517d0 100644
--- a/build.gradle
+++ b/build.gradle
@@ -157,6 +157,7 @@ configure(scalaProjects) {
"-unchecked",
"-language:reflectiveCalls",
"-Wconf:cat=deprecation:ws,any:e",
+ "-Wconf:msg=While parsing annotations in:silent",
"-Xlint:strict-unsealed-patmat"
]
}
diff --git a/driver-core/src/main/com/mongodb/internal/ClientSideOperationTimeout.java b/driver-core/src/main/com/mongodb/internal/ClientSideOperationTimeout.java
new file mode 100644
index 00000000000..add7074131d
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/ClientSideOperationTimeout.java
@@ -0,0 +1,158 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb.internal;
+
+import com.mongodb.lang.Nullable;
+
+import java.util.Objects;
+
+import static com.mongodb.assertions.Assertions.assertNotNull;
+import static com.mongodb.assertions.Assertions.isTrueArgument;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
+
+/**
+ * Client Side Operation Timeout.
+ *
+ *
Includes support for the deprecated {@code maxTimeMS} and {@code maxCommitTimeMS} operation configurations
+ */
+public class ClientSideOperationTimeout {
+
+ private final Long timeoutMS;
+ private final long maxAwaitTimeMS;
+
+ // Deprecated operation based operation timeouts
+ private final long maxTimeMS;
+ private final long maxCommitTimeMS;
+
+ @Nullable
+ private Timeout timeout;
+
+ public ClientSideOperationTimeout(@Nullable final Long timeoutMS,
+ final long maxAwaitTimeMS,
+ final long maxTimeMS,
+ final long maxCommitTimeMS) {
+ isTrueArgument("timeoutMS must be >= 0", timeoutMS == null || timeoutMS >= 0);
+ this.timeoutMS = timeoutMS;
+ this.maxAwaitTimeMS = maxAwaitTimeMS;
+ this.maxTimeMS = timeoutMS == null ? maxTimeMS : 0;
+ this.maxCommitTimeMS = timeoutMS == null ? maxCommitTimeMS : 0;
+
+ if (timeoutMS != null) {
+ if (timeoutMS == 0) {
+ this.timeout = Timeout.infinite();
+ } else {
+ this.timeout = Timeout.startNow(timeoutMS, MILLISECONDS);
+ }
+ }
+ }
+
+ /**
+ * Allows for the differentiation between users explicitly setting a global operation timeout via {@code timeoutMS}.
+ *
+ * @return true if a timeout has been set.
+ */
+ public boolean hasTimeoutMS() {
+ return timeoutMS != null;
+ }
+
+ /**
+ * Checks the expiry of the timeout.
+ *
+ * @return true if the timeout has been set and it has expired
+ */
+ public boolean expired() {
+ return timeout != null && timeout.expired();
+ }
+
+ /**
+ * Returns the remaining {@code timeoutMS} if set or the {@code alternativeTimeoutMS}.
+ *
+ * @param alternativeTimeoutMS the alternative timeout.
+ * @return timeout to use.
+ */
+ public long timeoutOrAlternative(final long alternativeTimeoutMS) {
+ if (timeoutMS == null) {
+ return alternativeTimeoutMS;
+ } else if (timeoutMS == 0) {
+ return timeoutMS;
+ } else {
+ return timeoutRemainingMS();
+ }
+ }
+
+ /**
+ * Calculates the minimum timeout value between two possible timeouts.
+ *
+ * @param alternativeTimeoutMS the alternative timeout
+ * @return the minimum value to use.
+ */
+ public long calculateMin(final long alternativeTimeoutMS) {
+ if (timeoutMS == null) {
+ return alternativeTimeoutMS;
+ } else if (timeoutMS == 0) {
+ return alternativeTimeoutMS;
+ } else if (alternativeTimeoutMS == 0) {
+ return timeoutRemainingMS();
+ } else {
+ return Math.min(timeoutRemainingMS(), alternativeTimeoutMS);
+ }
+ }
+
+ public long getMaxAwaitTimeMS() {
+ return maxAwaitTimeMS;
+ }
+
+ public long getMaxTimeMS() {
+ return timeoutOrAlternative(maxTimeMS);
+ }
+
+ public long getMaxCommitTimeMS() {
+ return timeoutOrAlternative(maxCommitTimeMS);
+ }
+
+ private long timeoutRemainingMS() {
+ assertNotNull(timeout);
+ return timeout.isInfinite() ? 0 : timeout.remaining(MILLISECONDS);
+ }
+
+ @Override
+ public String toString() {
+ return "ClientSideOperationTimeout{"
+ + "timeoutMS=" + timeoutMS
+ + ", maxAwaitTimeMS=" + maxAwaitTimeMS
+ + ", maxTimeMS=" + maxTimeMS
+ + ", maxCommitTimeMS=" + maxCommitTimeMS
+ + '}';
+ }
+
+ @Override
+ public boolean equals(final Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ final ClientSideOperationTimeout that = (ClientSideOperationTimeout) o;
+ return maxAwaitTimeMS == that.maxAwaitTimeMS && maxTimeMS == that.maxTimeMS && maxCommitTimeMS == that.maxCommitTimeMS
+ && Objects.equals(timeoutMS, that.timeoutMS);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(timeoutMS, maxAwaitTimeMS, maxTimeMS, maxCommitTimeMS);
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/ClientSideOperationTimeouts.java b/driver-core/src/main/com/mongodb/internal/ClientSideOperationTimeouts.java
new file mode 100644
index 00000000000..c0c655abcff
--- /dev/null
+++ b/driver-core/src/main/com/mongodb/internal/ClientSideOperationTimeouts.java
@@ -0,0 +1,55 @@
+/*
+ * Copyright 2008-present MongoDB, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.mongodb.internal;
+
+import com.mongodb.lang.Nullable;
+
+/**
+ * A factory for creating {@link ClientSideOperationTimeout} instances
+ */
+public final class ClientSideOperationTimeouts {
+
+ public static final ClientSideOperationTimeout NO_TIMEOUT = create(null, 0, 0, 0);
+
+ public static ClientSideOperationTimeout create(@Nullable final Long timeoutMS) {
+ return create(timeoutMS, 0);
+ }
+
+ public static ClientSideOperationTimeout create(@Nullable final Long timeoutMS, final long maxTimeMS) {
+ return create(timeoutMS, maxTimeMS, 0);
+ }
+
+ public static ClientSideOperationTimeout create(@Nullable final Long timeoutMS,
+ final long maxTimeMS,
+ final long maxAwaitTimeMS) {
+ return new ClientSideOperationTimeout(timeoutMS, maxAwaitTimeMS, maxTimeMS, 0);
+ }
+
+ public static ClientSideOperationTimeout create(@Nullable final Long timeoutMS,
+ final long maxTimeMS,
+ final long maxAwaitTimeMS,
+ final long maxCommitMS) {
+ return new ClientSideOperationTimeout(timeoutMS, maxAwaitTimeMS, maxTimeMS, maxCommitMS);
+ }
+
+ public static ClientSideOperationTimeout withMaxCommitMS(@Nullable final Long timeoutMS,
+ @Nullable final Long maxCommitMS) {
+ return create(timeoutMS, 0, 0, maxCommitMS != null ? maxCommitMS : 0);
+ }
+
+ private ClientSideOperationTimeouts() {
+ }
+}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java
index 13166eb53ab..9dd63007b74 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java
@@ -18,6 +18,7 @@
import com.mongodb.Function;
import com.mongodb.WriteConcern;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
@@ -31,8 +32,8 @@
public class AbortTransactionOperation extends TransactionOperation {
private BsonDocument recoveryToken;
- public AbortTransactionOperation(final WriteConcern writeConcern) {
- super(writeConcern);
+ public AbortTransactionOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final WriteConcern writeConcern) {
+ super(clientSideOperationTimeout, writeConcern);
}
public AbortTransactionOperation recoveryToken(@Nullable final BsonDocument recoveryToken) {
@@ -49,7 +50,9 @@ protected String getCommandName() {
CommandCreator getCommandCreator() {
CommandCreator creator = super.getCommandCreator();
if (recoveryToken != null) {
- return (serverDescription, connectionDescription) -> creator.create(serverDescription, connectionDescription).append("recoveryToken", recoveryToken);
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) ->
+ creator.create(clientSideOperationTimeout, serverDescription, connectionDescription)
+ .append("recoveryToken", recoveryToken);
}
return creator;
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java
index 857c14b857c..5af8841f343 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperation.java
@@ -19,6 +19,7 @@
import com.mongodb.ExplainVerbosity;
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
@@ -31,7 +32,6 @@
import org.bson.codecs.Decoder;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import static com.mongodb.internal.operation.ExplainHelper.asExplainCommand;
import static com.mongodb.internal.operation.ServerVersionHelper.MIN_WIRE_VERSION;
@@ -44,13 +44,14 @@
public class AggregateOperation implements AsyncExplainableReadOperation>, ExplainableReadOperation> {
private final AggregateOperationImpl wrapped;
- public AggregateOperation(final MongoNamespace namespace, final List pipeline, final Decoder decoder) {
- this(namespace, pipeline, decoder, AggregationLevel.COLLECTION);
+ public AggregateOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final List pipeline, final Decoder decoder) {
+ this(clientSideOperationTimeout, namespace, pipeline, decoder, AggregationLevel.COLLECTION);
}
- public AggregateOperation(final MongoNamespace namespace, final List pipeline, final Decoder decoder,
- final AggregationLevel aggregationLevel) {
- this.wrapped = new AggregateOperationImpl<>(namespace, pipeline, decoder, aggregationLevel);
+ public AggregateOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final List pipeline, final Decoder decoder, final AggregationLevel aggregationLevel) {
+ this.wrapped = new AggregateOperationImpl<>(clientSideOperationTimeout, namespace, pipeline, decoder, aggregationLevel);
}
public List getPipeline() {
@@ -75,24 +76,6 @@ public AggregateOperation batchSize(@Nullable final Integer batchSize) {
return this;
}
- public long getMaxAwaitTime(final TimeUnit timeUnit) {
- return wrapped.getMaxAwaitTime(timeUnit);
- }
-
- public AggregateOperation maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) {
- wrapped.maxAwaitTime(maxAwaitTime, timeUnit);
- return this;
- }
-
- public long getMaxTime(final TimeUnit timeUnit) {
- return wrapped.getMaxTime(timeUnit);
- }
-
- public AggregateOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- wrapped.maxTime(maxTime, timeUnit);
- return this;
- }
-
public Collation getCollation() {
return wrapped.getCollation();
}
@@ -159,24 +142,20 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
}
public ReadOperation asExplainableOperation(@Nullable final ExplainVerbosity verbosity, final Decoder resultDecoder) {
- return new CommandReadOperation<>(getNamespace().getDatabaseName(),
- asExplainCommand(wrapped.getCommand(NoOpSessionContext.INSTANCE, MIN_WIRE_VERSION), verbosity),
- resultDecoder);
+ return new CommandReadOperation<>(wrapped.getClientSideOperationTimeout(), getNamespace().getDatabaseName(),
+ asExplainCommand(wrapped.getCommand(wrapped.getClientSideOperationTimeout(), NoOpSessionContext.INSTANCE, MIN_WIRE_VERSION),
+ verbosity), resultDecoder);
}
public AsyncReadOperation asAsyncExplainableOperation(@Nullable final ExplainVerbosity verbosity,
final Decoder resultDecoder) {
- return new CommandReadOperation<>(getNamespace().getDatabaseName(),
- asExplainCommand(wrapped.getCommand(NoOpSessionContext.INSTANCE, MIN_WIRE_VERSION), verbosity),
- resultDecoder);
+ return new CommandReadOperation<>(wrapped.getClientSideOperationTimeout(), getNamespace().getDatabaseName(),
+ asExplainCommand(wrapped.getCommand(wrapped.getClientSideOperationTimeout(), NoOpSessionContext.INSTANCE, MIN_WIRE_VERSION),
+ verbosity), resultDecoder);
}
-
MongoNamespace getNamespace() {
return wrapped.getNamespace();
}
- Decoder getDecoder() {
- return wrapped.getDecoder();
- }
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java
index 4379845bdd1..8add07ace70 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateOperationImpl.java
@@ -19,6 +19,7 @@
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
@@ -38,7 +39,6 @@
import java.util.Arrays;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrueArgument;
@@ -59,6 +59,7 @@ class AggregateOperationImpl implements AsyncReadOperation FIELD_NAMES_WITH_RESULT = Arrays.asList(RESULT, FIRST_BATCH);
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private final List pipeline;
private final Decoder decoder;
@@ -71,18 +72,20 @@ class AggregateOperationImpl implements AsyncReadOperation pipeline, final Decoder decoder,
- final AggregationLevel aggregationLevel) {
- this(namespace, pipeline, decoder, defaultAggregateTarget(notNull("aggregationLevel", aggregationLevel),
- notNull("namespace", namespace).getCollectionName()), defaultPipelineCreator(pipeline));
+ AggregateOperationImpl(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final List pipeline, final Decoder decoder, final AggregationLevel aggregationLevel) {
+ this(clientSideOperationTimeout, namespace, pipeline, decoder,
+ defaultAggregateTarget(notNull("aggregationLevel", aggregationLevel),
+ notNull("namespace", namespace).getCollectionName()),
+ defaultPipelineCreator(pipeline));
}
- AggregateOperationImpl(final MongoNamespace namespace, final List pipeline, final Decoder decoder,
- final AggregateTarget aggregateTarget, final PipelineCreator pipelineCreator) {
+ AggregateOperationImpl(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final List pipeline, final Decoder decoder, final AggregateTarget aggregateTarget,
+ final PipelineCreator pipelineCreator) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
this.pipeline = notNull("pipeline", pipeline);
this.decoder = notNull("decoder", decoder);
@@ -120,30 +123,6 @@ AggregateOperationImpl batchSize(@Nullable final Integer batchSize) {
return this;
}
- long getMaxAwaitTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxAwaitTimeMS, TimeUnit.MILLISECONDS);
- }
-
- AggregateOperationImpl maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- isTrueArgument("maxAwaitTime >= 0", maxAwaitTime >= 0);
- this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit);
- return this;
- }
-
- long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, TimeUnit.MILLISECONDS);
- }
-
- AggregateOperationImpl maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- isTrueArgument("maxTime >= 0", maxTime >= 0);
- this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
Collation getCollation() {
return collation;
}
@@ -182,6 +161,10 @@ BsonValue getHint() {
return hint;
}
+ public ClientSideOperationTimeout getClientSideOperationTimeout() {
+ return clientSideOperationTimeout;
+ }
+
AggregateOperationImpl hint(@Nullable final BsonValue hint) {
isTrueArgument("BsonString or BsonDocument", hint == null || hint.isDocument() || hint.isString());
this.hint = hint;
@@ -190,30 +173,34 @@ AggregateOperationImpl hint(@Nullable final BsonValue hint) {
@Override
public BatchCursor execute(final ReadBinding binding) {
- return executeRetryableRead(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()),
- CommandResultDocumentCodec.create(decoder, FIELD_NAMES_WITH_RESULT), transformer(), retryReads);
+ return executeRetryableRead(clientSideOperationTimeout, binding, namespace.getDatabaseName(),
+ getCommandCreator(binding.getSessionContext()), CommandResultDocumentCodec.create(decoder, FIELD_NAMES_WITH_RESULT),
+ transformer(), retryReads);
}
@Override
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) {
SingleResultCallback> errHandlingCallback = errorHandlingCallback(callback, LOGGER);
- executeRetryableReadAsync(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()),
- CommandResultDocumentCodec.create(this.decoder, FIELD_NAMES_WITH_RESULT), asyncTransformer(), retryReads,
+ executeRetryableReadAsync(clientSideOperationTimeout, binding, namespace.getDatabaseName(),
+ getCommandCreator(binding.getSessionContext()), CommandResultDocumentCodec.create(this.decoder, FIELD_NAMES_WITH_RESULT),
+ asyncTransformer(), retryReads,
errHandlingCallback);
}
private CommandCreator getCommandCreator(final SessionContext sessionContext) {
- return (serverDescription, connectionDescription) -> getCommand(sessionContext, connectionDescription.getMaxWireVersion());
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) ->
+ getCommand(clientSideOperationTimeout, sessionContext, connectionDescription.getMaxWireVersion());
}
- BsonDocument getCommand(final SessionContext sessionContext, final int maxWireVersion) {
+ BsonDocument getCommand(final ClientSideOperationTimeout clientSideOperationTimeout, final SessionContext sessionContext,
+ final int maxWireVersion) {
BsonDocument commandDocument = new BsonDocument("aggregate", aggregateTarget.create());
appendReadConcernToCommand(sessionContext, maxWireVersion, commandDocument);
commandDocument.put("pipeline", pipelineCreator.create());
+ long maxTimeMS = clientSideOperationTimeout.getMaxTimeMS();
if (maxTimeMS > 0) {
- commandDocument.put("maxTimeMS", maxTimeMS > Integer.MAX_VALUE
- ? new BsonInt64(maxTimeMS) : new BsonInt32((int) maxTimeMS));
+ commandDocument.put("maxTimeMS", new BsonInt64(maxTimeMS));
}
BsonDocument cursor = new BsonDocument();
if (batchSize != null) {
@@ -247,6 +234,7 @@ private QueryResult createQueryResult(final BsonDocument result, final Connec
private CommandReadTransformer> transformer() {
return (result, source, connection) -> {
QueryResult queryResult = createQueryResult(result, connection.getDescription());
+ long maxAwaitTimeMS = clientSideOperationTimeout.getMaxAwaitTimeMS();
return new QueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder, comment,
source, connection, result);
};
@@ -255,6 +243,7 @@ private CommandReadTransformer> transformer()
private CommandReadTransformerAsync> asyncTransformer() {
return (result, source, connection) -> {
QueryResult queryResult = createQueryResult(result, connection.getDescription());
+ long maxAwaitTimeMS = clientSideOperationTimeout.getMaxAwaitTimeMS();
return new AsyncQueryBatchCursor<>(queryResult, 0, batchSize != null ? batchSize : 0, maxAwaitTimeMS, decoder,
comment, source, connection, result);
};
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java
index f41d0e4a462..aef625125fa 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AggregateToCollectionOperation.java
@@ -21,6 +21,7 @@
import com.mongodb.ReadPreference;
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Collation;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
@@ -36,7 +37,6 @@
import org.bson.codecs.BsonDocumentCodec;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
@@ -56,6 +56,7 @@
* This class is not part of the public API and may be removed or changed at any time
*/
public class AggregateToCollectionOperation implements AsyncReadOperation, ReadOperation {
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private final List pipeline;
private final WriteConcern writeConcern;
@@ -63,35 +64,21 @@ public class AggregateToCollectionOperation implements AsyncReadOperation,
private final AggregationLevel aggregationLevel;
private Boolean allowDiskUse;
- private long maxTimeMS;
private Boolean bypassDocumentValidation;
private Collation collation;
private BsonValue comment;
private BsonValue hint;
private BsonDocument variables;
- public AggregateToCollectionOperation(final MongoNamespace namespace, final List pipeline) {
- this(namespace, pipeline, null, null, AggregationLevel.COLLECTION);
+ public AggregateToCollectionOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final List pipeline, final ReadConcern readConcern, final WriteConcern writeConcern) {
+ this(clientSideOperationTimeout, namespace, pipeline, readConcern, writeConcern, AggregationLevel.COLLECTION);
}
- public AggregateToCollectionOperation(final MongoNamespace namespace, final List pipeline,
- final WriteConcern writeConcern) {
- this(namespace, pipeline, null, writeConcern, AggregationLevel.COLLECTION);
- }
-
- public AggregateToCollectionOperation(final MongoNamespace namespace, final List pipeline,
- final ReadConcern readConcern) {
- this(namespace, pipeline, readConcern, null, AggregationLevel.COLLECTION);
- }
-
- public AggregateToCollectionOperation(final MongoNamespace namespace, final List pipeline,
- final ReadConcern readConcern, final WriteConcern writeConcern) {
- this(namespace, pipeline, readConcern, writeConcern, AggregationLevel.COLLECTION);
- }
-
- public AggregateToCollectionOperation(final MongoNamespace namespace, final List pipeline,
- @Nullable final ReadConcern readConcern, @Nullable final WriteConcern writeConcern,
- final AggregationLevel aggregationLevel) {
+ public AggregateToCollectionOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final List pipeline, @Nullable final ReadConcern readConcern, @Nullable final WriteConcern writeConcern,
+ final AggregationLevel aggregationLevel) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
this.pipeline = notNull("pipeline", pipeline);
this.writeConcern = writeConcern;
@@ -122,17 +109,6 @@ public AggregateToCollectionOperation allowDiskUse(@Nullable final Boolean allow
return this;
}
- public long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public AggregateToCollectionOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
public Boolean getBypassDocumentValidation() {
return bypassDocumentValidation;
}
@@ -176,10 +152,10 @@ public AggregateToCollectionOperation hint(@Nullable final BsonValue hint) {
@Override
public Void execute(final ReadBinding binding) {
- return executeRetryableRead(binding,
+ return executeRetryableRead(clientSideOperationTimeout, binding,
() -> binding.getReadConnectionSource(FIVE_DOT_ZERO_WIRE_VERSION, ReadPreference.primary()),
namespace.getDatabaseName(),
- (serverDescription, connectionDescription) -> getCommand(),
+ (clientSideOperationTimeout, serverDescription, connectionDescription) -> getCommand(),
new BsonDocumentCodec(), (result, source, connection) -> {
throwOnWriteConcernError(result, connection.getDescription().getServerAddress(),
connection.getDescription().getMaxWireVersion());
@@ -189,12 +165,11 @@ public Void execute(final ReadBinding binding) {
@Override
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback callback) {
- executeRetryableReadAsync(binding,
- (connectionSourceCallback) -> {
- binding.getReadConnectionSource(FIVE_DOT_ZERO_WIRE_VERSION, ReadPreference.primary(), connectionSourceCallback);
- },
+ executeRetryableReadAsync(clientSideOperationTimeout, binding,
+ (connectionSourceCallback) ->
+ binding.getReadConnectionSource(FIVE_DOT_ZERO_WIRE_VERSION, ReadPreference.primary(), connectionSourceCallback),
namespace.getDatabaseName(),
- (serverDescription, connectionDescription) -> getCommand(),
+ (clientSideOperationTimeout, serverDescription, connectionDescription) -> getCommand(),
new BsonDocumentCodec(), (result, source, connection) -> {
throwOnWriteConcernError(result, connection.getDescription().getServerAddress(),
connection.getDescription().getMaxWireVersion());
@@ -208,6 +183,7 @@ private BsonDocument getCommand() {
BsonDocument commandDocument = new BsonDocument("aggregate", aggregationTarget);
commandDocument.put("pipeline", new BsonArray(pipeline));
+ long maxTimeMS = clientSideOperationTimeout.getMaxTimeMS();
if (maxTimeMS > 0) {
commandDocument.put("maxTimeMS", new BsonInt64(maxTimeMS));
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
index 21b10cdff08..ff706753885 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperationHelper.java
@@ -22,6 +22,7 @@
import com.mongodb.ReadPreference;
import com.mongodb.ServerAddress;
import com.mongodb.assertions.Assertions;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.async.function.AsyncCallbackBiFunction;
@@ -153,6 +154,7 @@ static void withAsyncConnectionSource(final AsyncConnectionSource source, final
}
static void executeRetryableReadAsync(
+ final ClientSideOperationTimeout clientSideOperationTimeout,
final AsyncReadBinding binding,
final String database,
final CommandCreator commandCreator,
@@ -160,11 +162,12 @@ static void executeRetryableReadAsync(
final CommandReadTransformerAsync transformer,
final boolean retryReads,
final SingleResultCallback callback) {
- executeRetryableReadAsync(binding, binding::getReadConnectionSource, database, commandCreator, decoder, transformer, retryReads,
- callback);
+ executeRetryableReadAsync(clientSideOperationTimeout, binding, binding::getReadConnectionSource, database, commandCreator,
+ decoder, transformer, retryReads, callback);
}
static void executeRetryableReadAsync(
+ final ClientSideOperationTimeout clientSideOperationTimeout,
final AsyncReadBinding binding,
final AsyncCallbackSupplier sourceAsyncSupplier,
final String database,
@@ -185,11 +188,8 @@ static void executeRetryableReadAsync(
releasingCallback)) {
return;
}
- createReadCommandAndExecuteAsync(retryState, binding, source,
- database, commandCreator,
- decoder, transformer,
- connection,
- releasingCallback);
+ createReadCommandAndExecuteAsync(clientSideOperationTimeout, retryState, binding, source, database,
+ commandCreator, decoder, transformer, connection, releasingCallback);
})
).whenComplete(binding::release);
asyncRead.get(errorHandlingCallback(callback, OperationHelper.LOGGER));
@@ -209,6 +209,7 @@ static void executeCommandAsync(final AsyncWriteBinding binding,
}
static void executeRetryableWriteAsync(
+ final ClientSideOperationTimeout clientSideOperationTimeout,
final AsyncWriteBinding binding,
final String database,
@Nullable final ReadPreference readPreference,
@@ -243,7 +244,7 @@ static void executeRetryableWriteAsync(
.map(previousAttemptCommand -> {
Assertions.assertFalse(firstAttempt);
return retryCommandModifier.apply(previousAttemptCommand);
- }).orElseGet(() -> commandCreator.create(source.getServerDescription(), connection.getDescription()));
+ }).orElseGet(() -> commandCreator.create(clientSideOperationTimeout, source.getServerDescription(), connection.getDescription()));
// attach `maxWireVersion`, `retryableCommandFlag` ASAP because they are used to check whether we should retry
retryState.attach(AttachmentKeys.maxWireVersion(), maxWireVersion, true)
.attach(AttachmentKeys.retryableCommandFlag(), isRetryWritesEnabled(command), true)
@@ -262,6 +263,7 @@ static void executeRetryableWriteAsync(
}
static void createReadCommandAndExecuteAsync(
+ final ClientSideOperationTimeout clientSideOperationTimeout,
final RetryState retryState,
final AsyncReadBinding binding,
final AsyncConnectionSource source,
@@ -273,7 +275,7 @@ static void createReadCommandAndExecuteAsync(
final SingleResultCallback callback) {
BsonDocument command;
try {
- command = commandCreator.create(source.getServerDescription(), connection.getDescription());
+ command = commandCreator.create(clientSideOperationTimeout, source.getServerDescription(), connection.getDescription());
retryState.attach(AttachmentKeys.commandDescriptionSupplier(), command::getFirstKey, false);
} catch (IllegalArgumentException e) {
callback.onResult(null, e);
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java
index 81b5fb513f2..38193bf106e 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java
@@ -69,9 +69,9 @@ public final class AsyncOperations {
public AsyncOperations(final MongoNamespace namespace, final Class documentClass, final ReadPreference readPreference,
final CodecRegistry codecRegistry, final ReadConcern readConcern, final WriteConcern writeConcern,
- final boolean retryWrites, final boolean retryReads) {
+ final boolean retryWrites, final boolean retryReads, @Nullable final Long timeoutMS) {
this.operations = new Operations<>(namespace, documentClass, readPreference, codecRegistry, readConcern, writeConcern,
- retryWrites, retryReads);
+ retryWrites, retryReads, timeoutMS);
}
public MongoNamespace getNamespace() {
@@ -106,6 +106,11 @@ public boolean isRetryReads() {
return operations.isRetryReads();
}
+ @Nullable
+ public Long getTimeoutMS() {
+ return operations.getTimeoutMS();
+ }
+
public AsyncReadOperation countDocuments(final Bson filter, final CountOptions options) {
return operations.countDocuments(filter, options);
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java
index b7f721b5fc4..d97b80da764 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncQueryBatchCursor.java
@@ -55,9 +55,9 @@
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.CursorHelper.getNumberToReturn;
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
-import static com.mongodb.internal.operation.SyncOperationHelper.getMoreCursorDocumentToQueryResult;
import static com.mongodb.internal.operation.QueryHelper.translateCommandException;
import static com.mongodb.internal.operation.ServerVersionHelper.serverIsAtLeastVersionFourDotFour;
+import static com.mongodb.internal.operation.SyncOperationHelper.getMoreCursorDocumentToQueryResult;
import static java.lang.String.format;
import static java.util.Collections.singletonList;
diff --git a/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java b/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java
index e3ae79fa589..6603799276f 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/BaseFindAndModifyOperation.java
@@ -20,6 +20,7 @@
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Collation;
import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.WriteBinding;
@@ -33,8 +34,6 @@
import org.bson.codecs.Decoder;
import org.bson.conversions.Bson;
-import java.util.concurrent.TimeUnit;
-
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableWriteAsync;
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
@@ -43,7 +42,6 @@
import static com.mongodb.internal.operation.OperationHelper.isRetryableWrite;
import static com.mongodb.internal.operation.OperationHelper.validateHintForFindAndModify;
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableWrite;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* Abstract base class for findAndModify-based operations
@@ -51,7 +49,7 @@
* This class is not part of the public API and may be removed or changed at any time
*/
public abstract class BaseFindAndModifyOperation implements AsyncWriteOperation, WriteOperation {
-
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private final WriteConcern writeConcern;
private final boolean retryWrites;
@@ -60,15 +58,15 @@ public abstract class BaseFindAndModifyOperation implements AsyncWriteOperati
private BsonDocument filter;
private BsonDocument projection;
private BsonDocument sort;
- private long maxTimeMS;
private Collation collation;
private Bson hint;
private String hintString;
private BsonValue comment;
private BsonDocument variables;
- protected BaseFindAndModifyOperation(final MongoNamespace namespace, final WriteConcern writeConcern,
- final boolean retryWrites, final Decoder decoder) {
+ protected BaseFindAndModifyOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final WriteConcern writeConcern, final boolean retryWrites, final Decoder decoder) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
this.writeConcern = notNull("writeConcern", writeConcern);
this.retryWrites = retryWrites;
@@ -77,7 +75,7 @@ protected BaseFindAndModifyOperation(final MongoNamespace namespace, final Write
@Override
public T execute(final WriteBinding binding) {
- return executeRetryableWrite(binding, getDatabaseName(), null, getFieldNameValidator(),
+ return executeRetryableWrite(clientSideOperationTimeout, binding, getDatabaseName(), null, getFieldNameValidator(),
CommandResultDocumentCodec.create(getDecoder(), "value"),
getCommandCreator(binding.getSessionContext()),
FindAndModifyHelper.transformer(),
@@ -86,7 +84,7 @@ public T execute(final WriteBinding binding) {
@Override
public void executeAsync(final AsyncWriteBinding binding, final SingleResultCallback callback) {
- executeRetryableWriteAsync(binding, getDatabaseName(), null, getFieldNameValidator(),
+ executeRetryableWriteAsync(clientSideOperationTimeout, binding, getDatabaseName(), null, getFieldNameValidator(),
CommandResultDocumentCodec.create(getDecoder(), "value"),
getCommandCreator(binding.getSessionContext()), FindAndModifyHelper.asyncTransformer(), cmd -> cmd, callback);
}
@@ -125,17 +123,6 @@ public BaseFindAndModifyOperation projection(@Nullable final BsonDocument pro
return this;
}
- public long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, MILLISECONDS);
- }
-
- public BaseFindAndModifyOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- this.maxTimeMS = MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
public BsonDocument getSort() {
return sort;
}
@@ -198,7 +185,7 @@ public BaseFindAndModifyOperation let(@Nullable final BsonDocument variables)
protected abstract void specializeCommand(BsonDocument initialCommand, ConnectionDescription connectionDescription);
private CommandCreator getCommandCreator(final SessionContext sessionContext) {
- return (serverDescription, connectionDescription) -> {
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) -> {
BsonDocument commandDocument = new BsonDocument("findAndModify", new BsonString(getNamespace().getCollectionName()));
putIfNotNull(commandDocument, "query", getFilter());
putIfNotNull(commandDocument, "fields", getProjection());
@@ -206,7 +193,7 @@ private CommandCreator getCommandCreator(final SessionContext sessionContext) {
specializeCommand(commandDocument, connectionDescription);
- putIfNotZero(commandDocument, "maxTimeMS", getMaxTime(MILLISECONDS));
+ putIfNotZero(commandDocument, "maxTimeMS", clientSideOperationTimeout.getMaxTimeMS());
if (getWriteConcern().isAcknowledged() && !getWriteConcern().isServerDefault() && !sessionContext.hasActiveTransaction()) {
commandDocument.put("writeConcern", getWriteConcern().asDocument());
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java
index a2ba029eb56..14a1ae974ac 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ChangeStreamOperation.java
@@ -20,6 +20,7 @@
import com.mongodb.client.model.Collation;
import com.mongodb.client.model.changestream.FullDocument;
import com.mongodb.client.model.changestream.FullDocumentBeforeChange;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.AsyncAggregateResponseBatchCursor;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
@@ -40,7 +41,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.operation.AsyncOperationHelper.withAsyncReadConnectionSource;
@@ -65,15 +65,16 @@ public class ChangeStreamOperation implements AsyncReadOperation pipeline, final Decoder decoder) {
- this(namespace, fullDocument, fullDocumentBeforeChange, pipeline, decoder, ChangeStreamLevel.COLLECTION);
+ public ChangeStreamOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final FullDocument fullDocument, final FullDocumentBeforeChange fullDocumentBeforeChange,
+ final List pipeline, final Decoder decoder) {
+ this(clientSideOperationTimeout, namespace, fullDocument, fullDocumentBeforeChange, pipeline, decoder, ChangeStreamLevel.COLLECTION);
}
- public ChangeStreamOperation(final MongoNamespace namespace, final FullDocument fullDocument,
- final FullDocumentBeforeChange fullDocumentBeforeChange, final List pipeline,
+ public ChangeStreamOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final FullDocument fullDocument, final FullDocumentBeforeChange fullDocumentBeforeChange, final List pipeline,
final Decoder decoder, final ChangeStreamLevel changeStreamLevel) {
- this.wrapped = new AggregateOperationImpl<>(namespace, pipeline, RAW_BSON_DOCUMENT_CODEC,
+ this.wrapped = new AggregateOperationImpl<>(clientSideOperationTimeout, namespace, pipeline, RAW_BSON_DOCUMENT_CODEC,
getAggregateTarget(), getPipelineCreator());
this.fullDocument = notNull("fullDocument", fullDocument);
this.fullDocumentBeforeChange = notNull("fullDocumentBeforeChange", fullDocumentBeforeChange);
@@ -124,15 +125,6 @@ public ChangeStreamOperation batchSize(@Nullable final Integer batchSize) {
return this;
}
- public long getMaxAwaitTime(final TimeUnit timeUnit) {
- return wrapped.getMaxAwaitTime(timeUnit);
- }
-
- public ChangeStreamOperation maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) {
- wrapped.maxAwaitTime(maxAwaitTime, timeUnit);
- return this;
- }
-
public Collation getCollation() {
return wrapped.getCollation();
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java
index fb1cc3c2da2..0414db9221c 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CommandOperationHelper.java
@@ -28,6 +28,7 @@
import com.mongodb.assertions.Assertions;
import com.mongodb.connection.ConnectionDescription;
import com.mongodb.connection.ServerDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.function.RetryState;
import com.mongodb.internal.connection.OperationContext;
import com.mongodb.internal.operation.OperationHelper.ResourceSupplierInternalException;
@@ -48,7 +49,8 @@ final class CommandOperationHelper {
interface CommandCreator {
- BsonDocument create(ServerDescription serverDescription, ConnectionDescription connectionDescription);
+ BsonDocument create(ClientSideOperationTimeout clientSideOperationTimeout, ServerDescription serverDescription,
+ ConnectionDescription connectionDescription);
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java
index 47b807f91ec..eb39c2d1504 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CommandReadOperation.java
@@ -16,6 +16,7 @@
package com.mongodb.internal.operation;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
@@ -33,11 +34,14 @@
* This class is not part of the public API and may be removed or changed at any time
*/
public class CommandReadOperation implements AsyncReadOperation, ReadOperation {
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final String databaseName;
private final BsonDocument command;
private final Decoder decoder;
- public CommandReadOperation(final String databaseName, final BsonDocument command, final Decoder decoder) {
+ public CommandReadOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final String databaseName,
+ final BsonDocument command, final Decoder decoder) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.databaseName = notNull("databaseName", databaseName);
this.command = notNull("command", command);
this.decoder = notNull("decoder", decoder);
@@ -45,16 +49,18 @@ public CommandReadOperation(final String databaseName, final BsonDocument comman
@Override
public T execute(final ReadBinding binding) {
- return executeRetryableRead(binding, databaseName, getCommandCreator(), decoder, (result, source, connection) -> result, false);
+ return executeRetryableRead(clientSideOperationTimeout, binding, databaseName, getCommandCreator(), decoder,
+ (result, source, connection) -> result, false);
}
@Override
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback callback) {
- executeRetryableReadAsync(binding, databaseName, getCommandCreator(), decoder, (result, source, connection) -> result,
- false, callback);
+ executeRetryableReadAsync(clientSideOperationTimeout, binding, databaseName, getCommandCreator(), decoder,
+ (result, source, connection) -> result, false, callback);
}
+ // TODO (CSOT) - JAVA-5098 - should the command be modified for CSOT?
private CommandCreator getCommandCreator() {
- return (serverDescription, connectionDescription) -> command;
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) -> command;
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java
index 92779bc61ae..5127160e3f5 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java
@@ -25,22 +25,19 @@
import com.mongodb.MongoTimeoutException;
import com.mongodb.MongoWriteConcernException;
import com.mongodb.WriteConcern;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.WriteBinding;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
-import org.bson.BsonInt32;
-import org.bson.BsonInt64;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL;
-import static com.mongodb.assertions.Assertions.isTrueArgument;
-import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
import static com.mongodb.internal.operation.CommandOperationHelper.RETRYABLE_WRITE_ERROR_LABEL;
+import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero;
import static java.util.Arrays.asList;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
@@ -52,14 +49,14 @@
public class CommitTransactionOperation extends TransactionOperation {
private final boolean alreadyCommitted;
private BsonDocument recoveryToken;
- private Long maxCommitTimeMS;
- public CommitTransactionOperation(final WriteConcern writeConcern) {
- this(writeConcern, false);
+ public CommitTransactionOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final WriteConcern writeConcern) {
+ this(clientSideOperationTimeout, writeConcern, false);
}
- public CommitTransactionOperation(final WriteConcern writeConcern, final boolean alreadyCommitted) {
- super(writeConcern);
+ public CommitTransactionOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final WriteConcern writeConcern,
+ final boolean alreadyCommitted) {
+ super(clientSideOperationTimeout, writeConcern);
this.alreadyCommitted = alreadyCommitted;
}
@@ -68,26 +65,6 @@ public CommitTransactionOperation recoveryToken(@Nullable final BsonDocument rec
return this;
}
- public CommitTransactionOperation maxCommitTime(@Nullable final Long maxCommitTime, final TimeUnit timeUnit) {
- if (maxCommitTime == null) {
- this.maxCommitTimeMS = null;
- } else {
- notNull("timeUnit", timeUnit);
- isTrueArgument("maxCommitTime > 0", maxCommitTime > 0);
- this.maxCommitTimeMS = MILLISECONDS.convert(maxCommitTime, timeUnit);
- }
- return this;
- }
-
- @Nullable
- public Long getMaxCommitTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- if (maxCommitTimeMS == null) {
- return null;
- }
- return timeUnit.convert(maxCommitTimeMS, MILLISECONDS);
- }
-
@Override
public Void execute(final WriteBinding binding) {
try {
@@ -143,20 +120,20 @@ protected String getCommandName() {
@Override
CommandCreator getCommandCreator() {
- CommandCreator creator = (serverDescription, connectionDescription) -> {
- BsonDocument command = CommitTransactionOperation.super.getCommandCreator().create(serverDescription,
- connectionDescription);
- if (maxCommitTimeMS != null) {
- command.append("maxTimeMS",
- maxCommitTimeMS > Integer.MAX_VALUE
- ? new BsonInt64(maxCommitTimeMS) : new BsonInt32(maxCommitTimeMS.intValue()));
- }
+ CommandCreator creator = (clientSideOperationTimeout, serverDescription, connectionDescription) -> {
+ BsonDocument command = CommitTransactionOperation.super.getCommandCreator()
+ .create(clientSideOperationTimeout, serverDescription, connectionDescription);
+ long maxCommitTimeMS = clientSideOperationTimeout.getMaxCommitTimeMS();
+ putIfNotZero(command, "maxTimeMS", maxCommitTimeMS);
return command;
};
if (alreadyCommitted) {
- return (serverDescription, connectionDescription) -> getRetryCommandModifier().apply(creator.create(serverDescription, connectionDescription));
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) ->
+ getRetryCommandModifier().apply(creator.create(clientSideOperationTimeout, serverDescription, connectionDescription));
} else if (recoveryToken != null) {
- return (serverDescription, connectionDescription) -> creator.create(serverDescription, connectionDescription).append("recoveryToken", recoveryToken);
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) ->
+ creator.create(clientSideOperationTimeout, serverDescription, connectionDescription)
+ .append("recoveryToken", recoveryToken);
}
return creator;
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java
index 5cdb974b7c0..453597723cb 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CountDocumentsOperation.java
@@ -18,6 +18,7 @@
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
@@ -31,7 +32,6 @@
import java.util.ArrayList;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import static com.mongodb.assertions.Assertions.notNull;
@@ -40,6 +40,7 @@
*/
public class CountDocumentsOperation implements AsyncReadOperation, ReadOperation {
private static final Decoder DECODER = new BsonDocumentCodec();
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private boolean retryReads;
private BsonDocument filter;
@@ -47,13 +48,14 @@ public class CountDocumentsOperation implements AsyncReadOperation, ReadOp
private BsonValue comment;
private long skip;
private long limit;
- private long maxTimeMS;
private Collation collation;
- public CountDocumentsOperation(final MongoNamespace namespace) {
+ public CountDocumentsOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
}
+ @Nullable
public BsonDocument getFilter() {
return filter;
}
@@ -72,6 +74,7 @@ public boolean getRetryReads() {
return retryReads;
}
+ @Nullable
public BsonValue getHint() {
return hint;
}
@@ -99,17 +102,7 @@ public CountDocumentsOperation skip(final long skip) {
return this;
}
- public long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public CountDocumentsOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
+ @Nullable
public Collation getCollation() {
return collation;
}
@@ -153,12 +146,11 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
}
private AggregateOperation getAggregateOperation() {
- return new AggregateOperation<>(namespace, getPipeline(), DECODER)
+ return new AggregateOperation<>(clientSideOperationTimeout, namespace, getPipeline(), DECODER)
.retryReads(retryReads)
.collation(collation)
.comment(comment)
- .hint(hint)
- .maxTime(maxTimeMS, TimeUnit.MILLISECONDS);
+ .hint(hint);
}
private List getPipeline() {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java
index 43298bae4bf..fde5991360c 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CountOperation.java
@@ -18,7 +18,7 @@
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
-import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
@@ -30,8 +30,6 @@
import org.bson.codecs.BsonDocumentCodec;
import org.bson.codecs.Decoder;
-import java.util.concurrent.TimeUnit;
-
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
@@ -47,16 +45,17 @@
*/
public class CountOperation implements AsyncReadOperation, ReadOperation {
private static final Decoder DECODER = new BsonDocumentCodec();
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private boolean retryReads;
private BsonDocument filter;
private BsonValue hint;
private long skip;
private long limit;
- private long maxTimeMS;
private Collation collation;
- public CountOperation(final MongoNamespace namespace) {
+ public CountOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
}
@@ -105,17 +104,6 @@ public CountOperation skip(final long skip) {
return this;
}
- public long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public CountOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
public Collation getCollation() {
return collation;
}
@@ -127,14 +115,14 @@ public CountOperation collation(@Nullable final Collation collation) {
@Override
public Long execute(final ReadBinding binding) {
- return executeRetryableRead(binding, namespace.getDatabaseName(),
+ return executeRetryableRead(clientSideOperationTimeout, binding, namespace.getDatabaseName(),
getCommandCreator(binding.getSessionContext()), DECODER, transformer(), retryReads);
}
@Override
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback callback) {
- executeRetryableReadAsync(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()), DECODER,
- asyncTransformer(), retryReads, callback);
+ executeRetryableReadAsync(clientSideOperationTimeout, binding, namespace.getDatabaseName(),
+ getCommandCreator(binding.getSessionContext()), DECODER, asyncTransformer(), retryReads, callback);
}
private CommandReadTransformer transformer() {
@@ -146,23 +134,21 @@ private CommandReadTransformerAsync asyncTransformer() {
}
private CommandCreator getCommandCreator(final SessionContext sessionContext) {
- return (serverDescription, connectionDescription) -> getCommand(sessionContext, connectionDescription);
- }
-
- private BsonDocument getCommand(final SessionContext sessionContext, final ConnectionDescription connectionDescription) {
- BsonDocument document = new BsonDocument("count", new BsonString(namespace.getCollectionName()));
-
- appendReadConcernToCommand(sessionContext, connectionDescription.getMaxWireVersion(), document);
-
- putIfNotNull(document, "query", filter);
- putIfNotZero(document, "limit", limit);
- putIfNotZero(document, "skip", skip);
- putIfNotNull(document, "hint", hint);
- putIfNotZero(document, "maxTimeMS", maxTimeMS);
-
- if (collation != null) {
- document.put("collation", collation.asDocument());
- }
- return document;
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) -> {
+ BsonDocument document = new BsonDocument("count", new BsonString(namespace.getCollectionName()));
+
+ appendReadConcernToCommand(sessionContext, connectionDescription.getMaxWireVersion(), document);
+
+ putIfNotNull(document, "query", filter);
+ putIfNotZero(document, "limit", limit);
+ putIfNotZero(document, "skip", skip);
+ putIfNotNull(document, "hint", hint);
+ putIfNotZero(document, "maxTimeMS", clientSideOperationTimeout.getMaxTimeMS());
+
+ if (collation != null) {
+ document.put("collation", collation.asDocument());
+ }
+ return document;
+ };
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java
index c78fee6838e..85ea7e52fca 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CreateCollectionOperation.java
@@ -26,6 +26,7 @@
import com.mongodb.client.model.ValidationAction;
import com.mongodb.client.model.ValidationLevel;
import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.WriteBinding;
@@ -71,6 +72,7 @@ public class CreateCollectionOperation implements AsyncWriteOperation, Wri
private static final BsonDocument ENCRYPT_CLUSTERED_INDEX = BsonDocument.parse("{key: {_id: 1}, unique: true}");
private static final BsonArray SAFE_CONTENT_ARRAY = new BsonArray(
singletonList(BsonDocument.parse("{key: {__safeContent__: 1}, name: '__safeContent___1'}")));
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final String databaseName;
private final String collectionName;
private final WriteConcern writeConcern;
@@ -92,11 +94,9 @@ public class CreateCollectionOperation implements AsyncWriteOperation, Wri
private String clusteredIndexName;
private BsonDocument encryptedFields;
- public CreateCollectionOperation(final String databaseName, final String collectionName) {
- this(databaseName, collectionName, null);
- }
-
- public CreateCollectionOperation(final String databaseName, final String collectionName, @Nullable final WriteConcern writeConcern) {
+ public CreateCollectionOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final String databaseName,
+ final String collectionName, @Nullable final WriteConcern writeConcern) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.databaseName = notNull("databaseName", databaseName);
this.collectionName = notNull("collectionName", collectionName);
this.writeConcern = writeConcern;
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java
index b47b45a5eee..eca30e18dc3 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CreateIndexesOperation.java
@@ -26,6 +26,7 @@
import com.mongodb.WriteConcern;
import com.mongodb.WriteConcernResult;
import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.WriteBinding;
@@ -44,7 +45,6 @@
import java.util.concurrent.TimeUnit;
import static com.mongodb.assertions.Assertions.assertNotNull;
-import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync;
@@ -66,18 +66,15 @@
* This class is not part of the public API and may be removed or changed at any time
*/
public class CreateIndexesOperation implements AsyncWriteOperation, WriteOperation {
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private final List requests;
private final WriteConcern writeConcern;
- private long maxTimeMS;
private CreateIndexCommitQuorum commitQuorum;
- public CreateIndexesOperation(final MongoNamespace namespace, final List requests) {
- this(namespace, requests, null);
- }
-
- public CreateIndexesOperation(final MongoNamespace namespace, final List requests,
- @Nullable final WriteConcern writeConcern) {
+ public CreateIndexesOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final List requests, @Nullable final WriteConcern writeConcern) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
this.requests = notNull("indexRequests", requests);
this.writeConcern = writeConcern;
@@ -103,18 +100,6 @@ public List getIndexNames() {
return indexNames;
}
- public long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public CreateIndexesOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- isTrueArgument("maxTime >= 0", maxTime >= 0);
- this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
public CreateIndexCommitQuorum getCommitQuorum() {
return commitQuorum;
}
@@ -231,7 +216,7 @@ private BsonDocument getCommand(final ConnectionDescription description) {
values.add(getIndex(request));
}
command.put("indexes", new BsonArray(values));
- putIfNotZero(command, "maxTimeMS", maxTimeMS);
+ putIfNotZero(command, "maxTimeMS", clientSideOperationTimeout.getMaxTimeMS());
appendWriteConcernToCommand(writeConcern, command);
if (commitQuorum != null) {
if (serverIsAtLeastVersionFourDotFour(description)) {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java
index 8d1e98de6b8..cbb1b360095 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/CreateViewOperation.java
@@ -18,6 +18,7 @@
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Collation;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.WriteBinding;
@@ -47,6 +48,7 @@
* This class is not part of the public API and may be removed or changed at any time
*/
public class CreateViewOperation implements AsyncWriteOperation, WriteOperation {
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final String databaseName;
private final String viewName;
private final String viewOn;
@@ -54,8 +56,9 @@ public class CreateViewOperation implements AsyncWriteOperation, WriteOper
private final WriteConcern writeConcern;
private Collation collation;
- public CreateViewOperation(final String databaseName, final String viewName, final String viewOn, final List pipeline,
- final WriteConcern writeConcern) {
+ public CreateViewOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final String databaseName,
+ final String viewName, final String viewOn, final List pipeline, final WriteConcern writeConcern) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.databaseName = notNull("databaseName", databaseName);
this.viewName = notNull("viewName", viewName);
this.viewOn = notNull("viewOn", viewOn);
diff --git a/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java
index a64c4cbfadd..50850f2789e 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/DistinctOperation.java
@@ -19,6 +19,7 @@
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
@@ -32,8 +33,6 @@
import org.bson.codecs.Codec;
import org.bson.codecs.Decoder;
-import java.util.concurrent.TimeUnit;
-
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
@@ -54,16 +53,18 @@
public class DistinctOperation implements AsyncReadOperation>, ReadOperation> {
private static final String VALUES = "values";
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private final String fieldName;
private final Decoder decoder;
private boolean retryReads;
private BsonDocument filter;
- private long maxTimeMS;
private Collation collation;
private BsonValue comment;
- public DistinctOperation(final MongoNamespace namespace, final String fieldName, final Decoder decoder) {
+ public DistinctOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final String fieldName, final Decoder decoder) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
this.fieldName = notNull("fieldName", fieldName);
this.decoder = notNull("decoder", decoder);
@@ -87,17 +88,6 @@ public boolean getRetryReads() {
return retryReads;
}
- public long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public DistinctOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
public Collation getCollation() {
return collation;
}
@@ -119,14 +109,15 @@ public DistinctOperation comment(final BsonValue comment) {
@Override
public BatchCursor execute(final ReadBinding binding) {
- return executeRetryableRead(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()),
- createCommandDecoder(), transformer(), retryReads);
+ return executeRetryableRead(clientSideOperationTimeout, binding, namespace.getDatabaseName(),
+ getCommandCreator(binding.getSessionContext()), createCommandDecoder(), transformer(), retryReads);
}
@Override
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) {
- executeRetryableReadAsync(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()),
- createCommandDecoder(), asyncTransformer(), retryReads, errorHandlingCallback(callback, LOGGER));
+ executeRetryableReadAsync(clientSideOperationTimeout, binding, namespace.getDatabaseName(),
+ getCommandCreator(binding.getSessionContext()), createCommandDecoder(), asyncTransformer(), retryReads,
+ errorHandlingCallback(callback, LOGGER));
}
private Codec createCommandDecoder() {
@@ -153,19 +144,17 @@ private CommandReadTransformerAsync> asyncTran
}
private CommandCreator getCommandCreator(final SessionContext sessionContext) {
- return (serverDescription, connectionDescription) -> getCommand(sessionContext, connectionDescription);
- }
-
- private BsonDocument getCommand(final SessionContext sessionContext, final ConnectionDescription connectionDescription) {
- BsonDocument commandDocument = new BsonDocument("distinct", new BsonString(namespace.getCollectionName()));
- appendReadConcernToCommand(sessionContext, connectionDescription.getMaxWireVersion(), commandDocument);
- commandDocument.put("key", new BsonString(fieldName));
- putIfNotNull(commandDocument, "query", filter);
- putIfNotZero(commandDocument, "maxTimeMS", maxTimeMS);
- if (collation != null) {
- commandDocument.put("collation", collation.asDocument());
- }
- putIfNotNull(commandDocument, "comment", comment);
- return commandDocument;
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) -> {
+ BsonDocument commandDocument = new BsonDocument("distinct", new BsonString(namespace.getCollectionName()));
+ appendReadConcernToCommand(sessionContext, connectionDescription.getMaxWireVersion(), commandDocument);
+ commandDocument.put("key", new BsonString(fieldName));
+ putIfNotNull(commandDocument, "query", filter);
+ putIfNotZero(commandDocument, "maxTimeMS", clientSideOperationTimeout.getMaxTimeMS());
+ if (collation != null) {
+ commandDocument.put("collation", collation.asDocument());
+ }
+ putIfNotNull(commandDocument, "comment", comment);
+ return commandDocument;
+ };
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/DocumentHelper.java b/driver-core/src/main/com/mongodb/internal/operation/DocumentHelper.java
index d0e73948339..46a66fcf28e 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/DocumentHelper.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/DocumentHelper.java
@@ -59,6 +59,12 @@ static void putIfNotNull(final BsonDocument command, final String key, @Nullable
}
}
+ static void putIfNotNull(final BsonDocument command, final String key, @Nullable final Boolean value) {
+ if (value != null) {
+ command.put(key, new BsonBoolean(value));
+ }
+ }
+
static void putIfNotZero(final BsonDocument command, final String key, final int value) {
if (value != 0) {
command.put(key, new BsonInt32(value));
diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java
index 6ddc087bdee..82490ae875d 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/DropCollectionOperation.java
@@ -19,6 +19,7 @@
import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
import com.mongodb.WriteConcern;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadWriteBinding;
import com.mongodb.internal.binding.AsyncWriteBinding;
@@ -61,16 +62,15 @@
public class DropCollectionOperation implements AsyncWriteOperation, WriteOperation {
private static final String ENCRYPT_PREFIX = "enxcol_.";
private static final BsonValueCodec BSON_VALUE_CODEC = new BsonValueCodec();
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private final WriteConcern writeConcern;
private BsonDocument encryptedFields;
private boolean autoEncryptedFields;
- public DropCollectionOperation(final MongoNamespace namespace) {
- this(namespace, null);
- }
-
- public DropCollectionOperation(final MongoNamespace namespace, @Nullable final WriteConcern writeConcern) {
+ public DropCollectionOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ @Nullable final WriteConcern writeConcern) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
this.writeConcern = writeConcern;
}
@@ -217,7 +217,7 @@ private BsonDocument getCollectionEncryptedFields(final BsonDocument defaultEncr
}
private ListCollectionsOperation listCollectionOperation() {
- return new ListCollectionsOperation<>(namespace.getDatabaseName(), BSON_VALUE_CODEC)
+ return new ListCollectionsOperation<>(clientSideOperationTimeout, namespace.getDatabaseName(), BSON_VALUE_CODEC)
.filter(new BsonDocument("name", new BsonString(namespace.getCollectionName())))
.batchSize(1);
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java
index 2dad7dda177..64112339c1c 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/DropDatabaseOperation.java
@@ -17,6 +17,7 @@
package com.mongodb.internal.operation;
import com.mongodb.WriteConcern;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.WriteBinding;
@@ -43,14 +44,13 @@
* This class is not part of the public API and may be removed or changed at any time
*/
public class DropDatabaseOperation implements AsyncWriteOperation, WriteOperation {
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final String databaseName;
private final WriteConcern writeConcern;
- public DropDatabaseOperation(final String databaseName) {
- this(databaseName, null);
- }
-
- public DropDatabaseOperation(final String databaseName, @Nullable final WriteConcern writeConcern) {
+ public DropDatabaseOperation(final ClientSideOperationTimeout clientSideOperationTimeout,
+ final String databaseName, @Nullable final WriteConcern writeConcern) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.databaseName = notNull("databaseName", databaseName);
this.writeConcern = writeConcern;
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java b/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java
index 66bb8f408fb..e3dbf619575 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/DropIndexOperation.java
@@ -19,6 +19,7 @@
import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
import com.mongodb.WriteConcern;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.WriteBinding;
@@ -26,9 +27,6 @@
import org.bson.BsonDocument;
import org.bson.BsonString;
-import java.util.concurrent.TimeUnit;
-
-import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeCommandAsync;
@@ -50,28 +48,24 @@
* This class is not part of the public API and may be removed or changed at any time
*/
public class DropIndexOperation implements AsyncWriteOperation, WriteOperation {
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private final String indexName;
private final BsonDocument indexKeys;
private final WriteConcern writeConcern;
- private long maxTimeMS;
-
- public DropIndexOperation(final MongoNamespace namespace, final String indexName) {
- this(namespace, indexName, null);
- }
-
- public DropIndexOperation(final MongoNamespace namespace, final BsonDocument keys) {
- this(namespace, keys, null);
- }
- public DropIndexOperation(final MongoNamespace namespace, final String indexName, @Nullable final WriteConcern writeConcern) {
+ public DropIndexOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final String indexName, @Nullable final WriteConcern writeConcern) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
this.indexName = notNull("indexName", indexName);
this.indexKeys = null;
this.writeConcern = writeConcern;
}
- public DropIndexOperation(final MongoNamespace namespace, final BsonDocument indexKeys, @Nullable final WriteConcern writeConcern) {
+ public DropIndexOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final BsonDocument indexKeys, @Nullable final WriteConcern writeConcern) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
this.indexKeys = notNull("indexKeys", indexKeys);
this.indexName = null;
@@ -82,18 +76,6 @@ public WriteConcern getWriteConcern() {
return writeConcern;
}
- public long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public DropIndexOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- isTrueArgument("maxTime >= 0", maxTime >= 0);
- this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
@Override
public Void execute(final WriteBinding binding) {
return withConnection(binding, connection -> {
@@ -135,7 +117,7 @@ private BsonDocument getCommand() {
command.put("index", indexKeys);
}
- putIfNotZero(command, "maxTimeMS", maxTimeMS);
+ putIfNotZero(command, "maxTimeMS", clientSideOperationTimeout.getMaxTimeMS());
appendWriteConcernToCommand(writeConcern, command);
return command;
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java b/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java
index 571de884582..824a045130a 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/EstimatedDocumentCountOperation.java
@@ -19,6 +19,7 @@
import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
@@ -30,8 +31,6 @@
import org.bson.codecs.BsonDocumentCodec;
import org.bson.codecs.Decoder;
-import java.util.concurrent.TimeUnit;
-
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
@@ -50,12 +49,14 @@
*/
public class EstimatedDocumentCountOperation implements AsyncReadOperation, ReadOperation {
private static final Decoder DECODER = new BsonDocumentCodec();
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private boolean retryReads;
- private long maxTimeMS;
private BsonValue comment;
- public EstimatedDocumentCountOperation(final MongoNamespace namespace) {
+ public EstimatedDocumentCountOperation(final ClientSideOperationTimeout clientSideOperationTimeout,
+ final MongoNamespace namespace) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
}
@@ -64,12 +65,6 @@ public EstimatedDocumentCountOperation retryReads(final boolean retryReads) {
return this;
}
- public EstimatedDocumentCountOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
@Nullable
public BsonValue getComment() {
return comment;
@@ -83,8 +78,9 @@ public EstimatedDocumentCountOperation comment(@Nullable final BsonValue comment
@Override
public Long execute(final ReadBinding binding) {
try {
- return executeRetryableRead(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()),
- CommandResultDocumentCodec.create(DECODER, singletonList("firstBatch")), transformer(), retryReads);
+ return executeRetryableRead(clientSideOperationTimeout, binding, namespace.getDatabaseName(),
+ getCommandCreator(binding.getSessionContext()), CommandResultDocumentCodec.create(DECODER, singletonList("firstBatch")),
+ transformer(), retryReads);
} catch (MongoCommandException e) {
return assertNotNull(rethrowIfNotNamespaceError(e, 0L));
}
@@ -92,8 +88,9 @@ public Long execute(final ReadBinding binding) {
@Override
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback callback) {
- executeRetryableReadAsync(binding, namespace.getDatabaseName(), getCommandCreator(binding.getSessionContext()),
- CommandResultDocumentCodec.create(DECODER, singletonList("firstBatch")), asyncTransformer(), retryReads,
+ executeRetryableReadAsync(clientSideOperationTimeout, binding, namespace.getDatabaseName(),
+ getCommandCreator(binding.getSessionContext()), CommandResultDocumentCodec.create(DECODER, singletonList("firstBatch")),
+ asyncTransformer(), retryReads,
(result, t) -> {
if (isNamespaceError(t)) {
callback.onResult(0L, null);
@@ -116,10 +113,10 @@ private long transformResult(final BsonDocument result, final ConnectionDescript
}
private CommandCreator getCommandCreator(final SessionContext sessionContext) {
- return (serverDescription, connectionDescription) -> {
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) -> {
BsonDocument document = new BsonDocument("count", new BsonString(namespace.getCollectionName()));
appendReadConcernToCommand(sessionContext, connectionDescription.getMaxWireVersion(), document);
- putIfNotZero(document, "maxTimeMS", maxTimeMS);
+ putIfNotZero(document, "maxTimeMS", clientSideOperationTimeout.getMaxTimeMS());
if (comment != null) {
document.put("comment", comment);
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java
index ede7ee51628..f1dc75e1a1a 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/FindAndDeleteOperation.java
@@ -20,6 +20,7 @@
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Collation;
import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.lang.Nullable;
import org.bson.BsonBoolean;
@@ -29,8 +30,6 @@
import org.bson.codecs.Decoder;
import org.bson.conversions.Bson;
-import java.util.concurrent.TimeUnit;
-
/**
* An operation that atomically finds and deletes a single document.
*
@@ -38,9 +37,9 @@
*/
public class FindAndDeleteOperation extends BaseFindAndModifyOperation {
- public FindAndDeleteOperation(final MongoNamespace namespace, final WriteConcern writeConcern, final boolean retryWrites,
- final Decoder decoder) {
- super(namespace, writeConcern, retryWrites, decoder);
+ public FindAndDeleteOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final WriteConcern writeConcern, final boolean retryWrites, final Decoder decoder) {
+ super(clientSideOperationTimeout, namespace, writeConcern, retryWrites, decoder);
}
@Override
@@ -55,12 +54,6 @@ public FindAndDeleteOperation projection(@Nullable final BsonDocument project
return this;
}
- @Override
- public FindAndDeleteOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- super.maxTime(maxTime, timeUnit);
- return this;
- }
-
@Override
public FindAndDeleteOperation sort(@Nullable final BsonDocument sort) {
super.sort(sort);
diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindAndReplaceOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindAndReplaceOperation.java
index de988c963a4..9cffda1565b 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/FindAndReplaceOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/FindAndReplaceOperation.java
@@ -20,6 +20,7 @@
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Collation;
import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.validator.MappedFieldNameValidator;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.internal.validator.ReplacingDocumentFieldNameValidator;
@@ -33,7 +34,6 @@
import java.util.HashMap;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.operation.DocumentHelper.putIfTrue;
@@ -49,9 +49,9 @@ public class FindAndReplaceOperation extends BaseFindAndModifyOperation {
private boolean upsert;
private Boolean bypassDocumentValidation;
- public FindAndReplaceOperation(final MongoNamespace namespace, final WriteConcern writeConcern, final boolean retryWrites,
- final Decoder decoder, final BsonDocument replacement) {
- super(namespace, writeConcern, retryWrites, decoder);
+ public FindAndReplaceOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final WriteConcern writeConcern, final boolean retryWrites, final Decoder decoder, final BsonDocument replacement) {
+ super(clientSideOperationTimeout, namespace, writeConcern, retryWrites, decoder);
this.replacement = notNull("replacement", replacement);
}
@@ -98,12 +98,6 @@ public FindAndReplaceOperation projection(@Nullable final BsonDocument projec
return this;
}
- @Override
- public FindAndReplaceOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- super.maxTime(maxTime, timeUnit);
- return this;
- }
-
@Override
public FindAndReplaceOperation sort(@Nullable final BsonDocument sort) {
super.sort(sort);
diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindAndUpdateOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindAndUpdateOperation.java
index 17ce879102d..2b21c61a703 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/FindAndUpdateOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/FindAndUpdateOperation.java
@@ -20,6 +20,7 @@
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Collation;
import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.validator.MappedFieldNameValidator;
import com.mongodb.internal.validator.NoOpFieldNameValidator;
import com.mongodb.internal.validator.UpdateFieldNameValidator;
@@ -35,7 +36,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.concurrent.TimeUnit;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
@@ -54,16 +54,16 @@ public class FindAndUpdateOperation extends BaseFindAndModifyOperation {
private Boolean bypassDocumentValidation;
private List arrayFilters;
- public FindAndUpdateOperation(final MongoNamespace namespace, final WriteConcern writeConcern, final boolean retryWrites,
- final Decoder decoder, final BsonDocument update) {
- super(namespace, writeConcern, retryWrites, decoder);
+ public FindAndUpdateOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final WriteConcern writeConcern, final boolean retryWrites, final Decoder decoder, final BsonDocument update) {
+ super(clientSideOperationTimeout, namespace, writeConcern, retryWrites, decoder);
this.update = notNull("update", update);
this.updatePipeline = null;
}
- public FindAndUpdateOperation(final MongoNamespace namespace, final WriteConcern writeConcern, final boolean retryWrites,
- final Decoder decoder, final List update) {
- super(namespace, writeConcern, retryWrites, decoder);
+ public FindAndUpdateOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final WriteConcern writeConcern, final boolean retryWrites, final Decoder decoder, final List update) {
+ super(clientSideOperationTimeout, namespace, writeConcern, retryWrites, decoder);
this.updatePipeline = update;
this.update = null;
}
@@ -126,12 +126,6 @@ public FindAndUpdateOperation projection(@Nullable final BsonDocument project
return this;
}
- @Override
- public FindAndUpdateOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- super.maxTime(maxTime, timeUnit);
- return this;
- }
-
@Override
public FindAndUpdateOperation sort(@Nullable final BsonDocument sort) {
super.sort(sort);
diff --git a/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java b/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java
index dcb94211fcf..e8fe9645fb2 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/FindOperation.java
@@ -22,6 +22,7 @@
import com.mongodb.MongoNamespace;
import com.mongodb.MongoQueryException;
import com.mongodb.client.model.Collation;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.async.function.AsyncCallbackSupplier;
@@ -40,10 +41,8 @@
import org.bson.BsonValue;
import org.bson.codecs.Decoder;
-import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
-import static com.mongodb.assertions.Assertions.isTrueArgument;
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
@@ -73,6 +72,7 @@
public class FindOperation implements AsyncExplainableReadOperation>, ExplainableReadOperation> {
private static final String FIRST_BATCH = "firstBatch";
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private final Decoder decoder;
private boolean retryReads;
@@ -80,8 +80,6 @@ public class FindOperation implements AsyncExplainableReadOperation implements AsyncExplainableReadOperation decoder) {
+ public FindOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final Decoder decoder) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
this.decoder = notNull("decoder", decoder);
}
@@ -147,30 +147,6 @@ public FindOperation projection(@Nullable final BsonDocument projection) {
return this;
}
- public long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public FindOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- isTrueArgument("maxTime >= 0", maxTime >= 0);
- this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
- public long getMaxAwaitTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxAwaitTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public FindOperation maxAwaitTime(final long maxAwaitTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- isTrueArgument("maxAwaitTime >= 0", maxAwaitTime >= 0);
- this.maxAwaitTimeMS = TimeUnit.MILLISECONDS.convert(maxAwaitTime, timeUnit);
- return this;
- }
-
public int getSkip() {
return skip;
}
@@ -322,7 +298,7 @@ public BatchCursor execute(final ReadBinding binding) {
withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection) -> {
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getSessionContext()));
try {
- return createReadCommandAndExecute(retryState, binding, source, namespace.getDatabaseName(),
+ return createReadCommandAndExecute(clientSideOperationTimeout, retryState, binding, source, namespace.getDatabaseName(),
getCommandCreator(binding.getSessionContext()), CommandResultDocumentCodec.create(decoder, FIRST_BATCH),
transformer(), connection);
} catch (MongoCommandException e) {
@@ -347,7 +323,7 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
return;
}
SingleResultCallback> wrappedCallback = exceptionTransformingCallback(releasingCallback);
- createReadCommandAndExecuteAsync(retryState, binding, source, namespace.getDatabaseName(),
+ createReadCommandAndExecuteAsync(clientSideOperationTimeout, retryState, binding, source, namespace.getDatabaseName(),
getCommandCreator(binding.getSessionContext()), CommandResultDocumentCodec.create(decoder, FIRST_BATCH),
asyncTransformer(), connection, wrappedCallback);
})
@@ -374,20 +350,21 @@ private static SingleResultCallback exceptionTransformingCallback(final S
@Override
public ReadOperation asExplainableOperation(@Nullable final ExplainVerbosity verbosity,
final Decoder resultDecoder) {
- return new CommandReadOperation<>(getNamespace().getDatabaseName(),
- asExplainCommand(getCommand(NoOpSessionContext.INSTANCE, MIN_WIRE_VERSION), verbosity),
+ return new CommandReadOperation<>(clientSideOperationTimeout, getNamespace().getDatabaseName(),
+ asExplainCommand(getCommand(clientSideOperationTimeout, NoOpSessionContext.INSTANCE, MIN_WIRE_VERSION), verbosity),
resultDecoder);
}
@Override
public AsyncReadOperation asAsyncExplainableOperation(@Nullable final ExplainVerbosity verbosity,
final Decoder resultDecoder) {
- return new CommandReadOperation<>(getNamespace().getDatabaseName(),
- asExplainCommand(getCommand(NoOpSessionContext.INSTANCE, MIN_WIRE_VERSION), verbosity),
+ return new CommandReadOperation<>(clientSideOperationTimeout, getNamespace().getDatabaseName(),
+ asExplainCommand(getCommand(clientSideOperationTimeout, NoOpSessionContext.INSTANCE, MIN_WIRE_VERSION), verbosity),
resultDecoder);
}
- private BsonDocument getCommand(final SessionContext sessionContext, final int maxWireVersion) {
+ private BsonDocument getCommand(final ClientSideOperationTimeout clientSideOperationTimeout, final SessionContext sessionContext,
+ final int maxWireVersion) {
BsonDocument commandDocument = new BsonDocument("find", new BsonString(namespace.getCollectionName()));
appendReadConcernToCommand(sessionContext, maxWireVersion, commandDocument);
@@ -411,6 +388,7 @@ private BsonDocument getCommand(final SessionContext sessionContext, final int m
if (limit < 0 || batchSize < 0) {
commandDocument.put("singleBatch", BsonBoolean.TRUE);
}
+ long maxTimeMS = clientSideOperationTimeout.getMaxTimeMS();
if (maxTimeMS > 0) {
commandDocument.put("maxTimeMS", new BsonInt64(maxTimeMS));
}
@@ -460,7 +438,8 @@ private BsonDocument getCommand(final SessionContext sessionContext, final int m
}
private CommandCreator getCommandCreator(final SessionContext sessionContext) {
- return (serverDescription, connectionDescription) -> getCommand(sessionContext, connectionDescription.getMaxWireVersion());
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) ->
+ getCommand(clientSideOperationTimeout, sessionContext, connectionDescription.getMaxWireVersion());
}
private boolean isTailableCursor() {
@@ -481,7 +460,7 @@ private CommandReadTransformer> transformer()
}
private long getMaxTimeForCursor() {
- return cursorType == CursorType.TailableAwait ? maxAwaitTimeMS : 0;
+ return cursorType == CursorType.TailableAwait ? clientSideOperationTimeout.getMaxAwaitTimeMS() : 0;
}
private CommandReadTransformerAsync> asyncTransformer() {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java
index fa2a5dcd995..c9bcf2a6f88 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ListCollectionsOperation.java
@@ -18,6 +18,7 @@
import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.async.function.AsyncCallbackSupplier;
@@ -26,15 +27,12 @@
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.lang.Nullable;
-import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
-import org.bson.BsonInt64;
import org.bson.BsonValue;
import org.bson.codecs.Codec;
import org.bson.codecs.Decoder;
-import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static com.mongodb.assertions.Assertions.notNull;
@@ -50,6 +48,8 @@
import static com.mongodb.internal.operation.CommandOperationHelper.rethrowIfNotNamespaceError;
import static com.mongodb.internal.operation.CursorHelper.getCursorDocumentFromBatchSize;
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
+import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero;
+import static com.mongodb.internal.operation.DocumentHelper.putIfTrue;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.canRetryRead;
import static com.mongodb.internal.operation.OperationHelper.createEmptyBatchCursor;
@@ -67,16 +67,18 @@
* This class is not part of the public API and may be removed or changed at any time
*/
public class ListCollectionsOperation implements AsyncReadOperation>, ReadOperation> {
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final String databaseName;
private final Decoder decoder;
private boolean retryReads;
private BsonDocument filter;
private int batchSize;
- private long maxTimeMS;
private boolean nameOnly;
private BsonValue comment;
- public ListCollectionsOperation(final String databaseName, final Decoder decoder) {
+ public ListCollectionsOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final String databaseName,
+ final Decoder decoder) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.databaseName = notNull("databaseName", databaseName);
this.decoder = notNull("decoder", decoder);
}
@@ -108,17 +110,6 @@ public ListCollectionsOperation batchSize(final int batchSize) {
return this;
}
- public long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public ListCollectionsOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
public ListCollectionsOperation retryReads(final boolean retryReads) {
this.retryReads = retryReads;
return this;
@@ -145,8 +136,8 @@ public BatchCursor execute(final ReadBinding binding) {
withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection) -> {
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getSessionContext()));
try {
- return createReadCommandAndExecute(retryState, binding, source, databaseName, getCommandCreator(),
- createCommandDecoder(), commandTransformer(), connection);
+ return createReadCommandAndExecute(clientSideOperationTimeout, retryState, binding, source, databaseName,
+ getCommandCreator(), createCommandDecoder(), commandTransformer(), connection);
} catch (MongoCommandException e) {
return rethrowIfNotNamespaceError(e, createEmptyBatchCursor(createNamespace(), decoder,
source.getServerDescription().getAddress(), batchSize));
@@ -168,8 +159,8 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
binding.getSessionContext()), releasingCallback)) {
return;
}
- createReadCommandAndExecuteAsync(retryState, binding, source, databaseName, getCommandCreator(), createCommandDecoder(),
- asyncTransformer(), connection, (result, t) -> {
+ createReadCommandAndExecuteAsync(clientSideOperationTimeout, retryState, binding, source, databaseName,
+ getCommandCreator(), createCommandDecoder(), asyncTransformer(), connection, (result, t) -> {
if (t != null && !isNamespaceError(t)) {
releasingCallback.onResult(null, t);
} else {
@@ -198,23 +189,15 @@ private CommandReadTransformer> commandTransformer(
}
private CommandOperationHelper.CommandCreator getCommandCreator() {
- return (serverDescription, connectionDescription) -> getCommand();
- }
-
- private BsonDocument getCommand() {
- BsonDocument command = new BsonDocument("listCollections", new BsonInt32(1))
- .append("cursor", getCursorDocumentFromBatchSize(batchSize == 0 ? null : batchSize));
- if (filter != null) {
- command.append("filter", filter);
- }
- if (nameOnly) {
- command.append("nameOnly", BsonBoolean.TRUE);
- }
- if (maxTimeMS > 0) {
- command.put("maxTimeMS", new BsonInt64(maxTimeMS));
- }
- putIfNotNull(command, "comment", comment);
- return command;
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) -> {
+ BsonDocument command = new BsonDocument("listCollections", new BsonInt32(1))
+ .append("cursor", getCursorDocumentFromBatchSize(batchSize == 0 ? null : batchSize));
+ putIfNotNull(command, "filter", filter);
+ putIfTrue(command, "nameOnly", nameOnly);
+ putIfNotZero(command, "maxTimeMS", clientSideOperationTimeout.getMaxTimeMS());
+ putIfNotNull(command, "comment", comment);
+ return command;
+ };
}
private Codec createCommandDecoder() {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java
index bacf64601c9..a32fa5c9639 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ListDatabasesOperation.java
@@ -17,27 +17,25 @@
package com.mongodb.internal.operation;
import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.internal.connection.QueryResult;
import com.mongodb.lang.Nullable;
-import org.bson.BsonBoolean;
import org.bson.BsonDocument;
import org.bson.BsonInt32;
-import org.bson.BsonInt64;
import org.bson.BsonValue;
import org.bson.codecs.Decoder;
-import java.util.concurrent.TimeUnit;
-
import static com.mongodb.assertions.Assertions.notNull;
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
import static com.mongodb.internal.operation.AsyncOperationHelper.CommandReadTransformerAsync;
import static com.mongodb.internal.operation.AsyncOperationHelper.executeRetryableReadAsync;
import static com.mongodb.internal.operation.CommandOperationHelper.CommandCreator;
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
+import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.SyncOperationHelper.CommandReadTransformer;
import static com.mongodb.internal.operation.SyncOperationHelper.executeRetryableRead;
@@ -49,30 +47,19 @@
* This class is not part of the public API and may be removed or changed at any time
*/
public class ListDatabasesOperation implements AsyncReadOperation>, ReadOperation> {
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final Decoder decoder;
private boolean retryReads;
-
- private long maxTimeMS;
private BsonDocument filter;
private Boolean nameOnly;
private Boolean authorizedDatabasesOnly;
private BsonValue comment;
- public ListDatabasesOperation(final Decoder decoder) {
+ public ListDatabasesOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final Decoder decoder) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.decoder = notNull("decoder", decoder);
}
- public long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public ListDatabasesOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
public ListDatabasesOperation filter(@Nullable final BsonDocument filter) {
this.filter = filter;
return this;
@@ -121,13 +108,13 @@ public ListDatabasesOperation comment(@Nullable final BsonValue comment) {
@Override
public BatchCursor execute(final ReadBinding binding) {
- return executeRetryableRead(binding, "admin", getCommandCreator(),
+ return executeRetryableRead(clientSideOperationTimeout, binding, "admin", getCommandCreator(),
CommandResultDocumentCodec.create(decoder, "databases"), transformer(), retryReads);
}
@Override
public void executeAsync(final AsyncReadBinding binding, final SingleResultCallback> callback) {
- executeRetryableReadAsync(binding, "admin", getCommandCreator(),
+ executeRetryableReadAsync(clientSideOperationTimeout, binding, "admin", getCommandCreator(),
CommandResultDocumentCodec.create(decoder, "databases"), asyncTransformer(),
retryReads, errorHandlingCallback(callback, LOGGER));
}
@@ -147,24 +134,14 @@ private QueryResult createQueryResult(final BsonDocument result, final Connec
}
private CommandCreator getCommandCreator() {
- return (serverDescription, connectionDescription) -> getCommand();
- }
-
- private BsonDocument getCommand() {
- BsonDocument command = new BsonDocument("listDatabases", new BsonInt32(1));
- if (maxTimeMS > 0) {
- command.put("maxTimeMS", new BsonInt64(maxTimeMS));
- }
- if (filter != null) {
- command.put("filter", filter);
- }
- if (nameOnly != null) {
- command.put("nameOnly", new BsonBoolean(nameOnly));
- }
- if (authorizedDatabasesOnly != null) {
- command.put("authorizedDatabases", new BsonBoolean(authorizedDatabasesOnly));
- }
- putIfNotNull(command, "comment", comment);
- return command;
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) -> {
+ BsonDocument command = new BsonDocument("listDatabases", new BsonInt32(1));
+ putIfNotNull(command, "filter", filter);
+ putIfNotNull(command, "nameOnly", nameOnly);
+ putIfNotNull(command, "authorizedDatabases", authorizedDatabasesOnly);
+ putIfNotZero(command, "maxTimeMS", clientSideOperationTimeout.getMaxTimeMS());
+ putIfNotNull(command, "comment", comment);
+ return command;
+ };
}
}
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java
index 62ecdc953bd..4426c63147a 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ListIndexesOperation.java
@@ -18,6 +18,7 @@
import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.async.function.AsyncCallbackSupplier;
@@ -27,13 +28,11 @@
import com.mongodb.internal.binding.ReadBinding;
import com.mongodb.lang.Nullable;
import org.bson.BsonDocument;
-import org.bson.BsonInt64;
import org.bson.BsonString;
import org.bson.BsonValue;
import org.bson.codecs.Codec;
import org.bson.codecs.Decoder;
-import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import static com.mongodb.assertions.Assertions.notNull;
@@ -50,6 +49,7 @@
import static com.mongodb.internal.operation.CommandOperationHelper.rethrowIfNotNamespaceError;
import static com.mongodb.internal.operation.CursorHelper.getCursorDocumentFromBatchSize;
import static com.mongodb.internal.operation.DocumentHelper.putIfNotNull;
+import static com.mongodb.internal.operation.DocumentHelper.putIfNotZero;
import static com.mongodb.internal.operation.OperationHelper.LOGGER;
import static com.mongodb.internal.operation.OperationHelper.canRetryRead;
import static com.mongodb.internal.operation.OperationHelper.createEmptyBatchCursor;
@@ -66,14 +66,17 @@
* This class is not part of the public API and may be removed or changed at any time
*/
public class ListIndexesOperation implements AsyncReadOperation>, ReadOperation> {
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private final Decoder decoder;
private boolean retryReads;
private int batchSize;
- private long maxTimeMS;
+
private BsonValue comment;
- public ListIndexesOperation(final MongoNamespace namespace, final Decoder decoder) {
+ public ListIndexesOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final Decoder decoder) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
this.decoder = notNull("decoder", decoder);
}
@@ -87,17 +90,6 @@ public ListIndexesOperation batchSize(final int batchSize) {
return this;
}
- public long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, TimeUnit.MILLISECONDS);
- }
-
- public ListIndexesOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- this.maxTimeMS = TimeUnit.MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
public ListIndexesOperation retryReads(final boolean retryReads) {
this.retryReads = retryReads;
return this;
@@ -124,8 +116,8 @@ public BatchCursor execute(final ReadBinding binding) {
withSourceAndConnection(binding::getReadConnectionSource, false, (source, connection) -> {
retryState.breakAndThrowIfRetryAnd(() -> !canRetryRead(source.getServerDescription(), binding.getSessionContext()));
try {
- return createReadCommandAndExecute(retryState, binding, source, namespace.getDatabaseName(), getCommandCreator(),
- createCommandDecoder(), transformer(), connection);
+ return createReadCommandAndExecute(clientSideOperationTimeout, retryState, binding, source, namespace.getDatabaseName(),
+ getCommandCreator(), createCommandDecoder(), transformer(), connection);
} catch (MongoCommandException e) {
return rethrowIfNotNamespaceError(e, createEmptyBatchCursor(namespace, decoder,
source.getServerDescription().getAddress(), batchSize));
@@ -147,8 +139,9 @@ public void executeAsync(final AsyncReadBinding binding, final SingleResultCallb
binding.getSessionContext()), releasingCallback)) {
return;
}
- createReadCommandAndExecuteAsync(retryState, binding, source, namespace.getDatabaseName(), getCommandCreator(),
- createCommandDecoder(), asyncTransformer(), connection, (result, t) -> {
+ createReadCommandAndExecuteAsync(clientSideOperationTimeout, retryState, binding, source,
+ namespace.getDatabaseName(), getCommandCreator(), createCommandDecoder(), asyncTransformer(),
+ connection, (result, t) -> {
if (t != null && !isNamespaceError(t)) {
releasingCallback.onResult(null, t);
} else {
@@ -165,17 +158,14 @@ private AsyncBatchCursor emptyAsyncCursor(final AsyncConnectionSource source)
}
private CommandCreator getCommandCreator() {
- return (serverDescription, connectionDescription) -> getCommand();
- }
-
- private BsonDocument getCommand() {
- BsonDocument command = new BsonDocument("listIndexes", new BsonString(namespace.getCollectionName()))
- .append("cursor", getCursorDocumentFromBatchSize(batchSize == 0 ? null : batchSize));
- if (maxTimeMS > 0) {
- command.put("maxTimeMS", new BsonInt64(maxTimeMS));
- }
- putIfNotNull(command, "comment", comment);
- return command;
+ return (clientSideOperationTimeout, serverDescription, connectionDescription) -> {
+ BsonDocument command = new BsonDocument("listIndexes", new BsonString(namespace.getCollectionName()))
+ .append("cursor", getCursorDocumentFromBatchSize(batchSize == 0 ? null : batchSize));
+
+ putIfNotZero(command, "maxTimeMS", clientSideOperationTimeout.getMaxTimeMS());
+ putIfNotNull(command, "comment", comment);
+ return command;
+ };
}
private CommandReadTransformer> transformer() {
diff --git a/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java b/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java
index 4c471a16bd4..3551462d80d 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/ListSearchIndexesOperation.java
@@ -20,6 +20,7 @@
import com.mongodb.MongoCommandException;
import com.mongodb.MongoNamespace;
import com.mongodb.client.model.Collation;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.AsyncBatchCursor;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncReadBinding;
@@ -31,12 +32,10 @@
import org.bson.BsonValue;
import org.bson.codecs.Decoder;
-import java.util.Collections;
-import java.util.concurrent.TimeUnit;
-
import static com.mongodb.internal.operation.AsyncOperationHelper.createEmptyAsyncBatchCursor;
import static com.mongodb.internal.operation.CommandOperationHelper.isNamespaceError;
import static com.mongodb.internal.operation.OperationHelper.createEmptyBatchCursor;
+import static java.util.Collections.singletonList;
/**
* An operation that lists Alas Search indexes with the help of {@value #STAGE_LIST_SEARCH_INDEXES} pipeline stage.
@@ -46,6 +45,7 @@
final class ListSearchIndexesOperation
implements AsyncExplainableReadOperation>, ExplainableReadOperation> {
private static final String STAGE_LIST_SEARCH_INDEXES = "$listSearchIndexes";
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private final Decoder decoder;
@Nullable
@@ -56,26 +56,20 @@ final class ListSearchIndexesOperation
private final Collation collation;
@Nullable
private final BsonValue comment;
- private final long maxTimeMS;
@Nullable
private final String indexName;
private final boolean retryReads;
- ListSearchIndexesOperation(final MongoNamespace namespace,
- final Decoder decoder,
- final long maxTimeMS,
- @Nullable final String indexName,
- @Nullable final Integer batchSize,
- @Nullable final Collation collation,
- @Nullable final BsonValue comment,
- @Nullable final Boolean allowDiskUse,
- final boolean retryReads) {
+ ListSearchIndexesOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final Decoder decoder, @Nullable final String indexName, @Nullable final Integer batchSize,
+ @Nullable final Collation collation, @Nullable final BsonValue comment, @Nullable final Boolean allowDiskUse,
+ final boolean retryReads) {
+ this.clientSideOperationTimeout = clientSideOperationTimeout;
this.namespace = namespace;
this.decoder = decoder;
this.allowDiskUse = allowDiskUse;
this.batchSize = batchSize;
this.collation = collation;
- this.maxTimeMS = maxTimeMS;
this.comment = comment;
this.indexName = indexName;
this.retryReads = retryReads;
@@ -125,13 +119,12 @@ private AggregateOperation asAggregateOperation() {
BsonDocument searchDefinition = getSearchDefinition();
BsonDocument listSearchIndexesStage = new BsonDocument(STAGE_LIST_SEARCH_INDEXES, searchDefinition);
- return new AggregateOperation<>(namespace, Collections.singletonList(listSearchIndexesStage), decoder)
+ return new AggregateOperation<>(clientSideOperationTimeout, namespace, singletonList(listSearchIndexesStage), decoder)
.retryReads(retryReads)
.collation(collation)
.comment(comment)
.allowDiskUse(allowDiskUse)
- .batchSize(batchSize)
- .maxTime(maxTimeMS, TimeUnit.MILLISECONDS);
+ .batchSize(batchSize);
}
@NonNull
diff --git a/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java
index 482b4261d10..d7da495f96c 100644
--- a/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java
+++ b/driver-core/src/main/com/mongodb/internal/operation/MapReduceToCollectionOperation.java
@@ -21,6 +21,7 @@
import com.mongodb.WriteConcern;
import com.mongodb.client.model.Collation;
import com.mongodb.connection.ConnectionDescription;
+import com.mongodb.internal.ClientSideOperationTimeout;
import com.mongodb.internal.async.SingleResultCallback;
import com.mongodb.internal.binding.AsyncWriteBinding;
import com.mongodb.internal.binding.WriteBinding;
@@ -32,7 +33,6 @@
import org.bson.codecs.BsonDocumentCodec;
import java.util.List;
-import java.util.concurrent.TimeUnit;
import static com.mongodb.assertions.Assertions.assertNotNull;
import static com.mongodb.assertions.Assertions.isTrue;
@@ -53,7 +53,6 @@
import static com.mongodb.internal.operation.WriteConcernHelper.appendWriteConcernToCommand;
import static com.mongodb.internal.operation.WriteConcernHelper.throwOnWriteConcernError;
import static java.util.Arrays.asList;
-import static java.util.concurrent.TimeUnit.MILLISECONDS;
/**
* Operation that runs a Map Reduce against a MongoDB instance. This operation does not support "inline" results, i.e. the results will
@@ -66,6 +65,7 @@
*/
public class
MapReduceToCollectionOperation implements AsyncWriteOperation, WriteOperation {
+ private final ClientSideOperationTimeout clientSideOperationTimeout;
private final MongoNamespace namespace;
private final BsonJavaScript mapFunction;
private final BsonJavaScript reduceFunction;
@@ -78,7 +78,6 @@
private int limit;
private boolean jsMode;
private boolean verbose;
- private long maxTimeMS;
private String action = "replace";
private String databaseName;
private boolean sharded;
@@ -87,14 +86,10 @@
private Collation collation;
private static final List VALID_ACTIONS = asList("replace", "merge", "reduce");
- public MapReduceToCollectionOperation(final MongoNamespace namespace, final BsonJavaScript mapFunction,
- final BsonJavaScript reduceFunction, final String collectionName) {
- this(namespace, mapFunction, reduceFunction, collectionName, null);
- }
-
- public MapReduceToCollectionOperation(final MongoNamespace namespace, final BsonJavaScript mapFunction,
- final BsonJavaScript reduceFunction, @Nullable final String collectionName,
- @Nullable final WriteConcern writeConcern) {
+ public MapReduceToCollectionOperation(final ClientSideOperationTimeout clientSideOperationTimeout, final MongoNamespace namespace,
+ final BsonJavaScript mapFunction, final BsonJavaScript reduceFunction, @Nullable final String collectionName,
+ @Nullable final WriteConcern writeConcern) {
+ this.clientSideOperationTimeout = notNull("clientSideOperationTimeout", clientSideOperationTimeout);
this.namespace = notNull("namespace", namespace);
this.mapFunction = notNull("mapFunction", mapFunction);
this.reduceFunction = notNull("reduceFunction", reduceFunction);
@@ -185,17 +180,6 @@ public MapReduceToCollectionOperation verbose(final boolean verbose) {
return this;
}
- public long getMaxTime(final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- return timeUnit.convert(maxTimeMS, MILLISECONDS);
- }
-
- public MapReduceToCollectionOperation maxTime(final long maxTime, final TimeUnit timeUnit) {
- notNull("timeUnit", timeUnit);
- this.maxTimeMS = MILLISECONDS.convert(maxTime, timeUnit);
- return this;
- }
-
public String getAction() {
return action;
}
@@ -295,9 +279,9 @@ public AsyncReadOperation asExplainableOperationAsync(final Explai
}
private CommandReadOperation createExplainableOperation(final ExplainVerbosity explainVerbosity) {
- return new CommandReadOperation<>(namespace.getDatabaseName(),
- ExplainHelper.asExplainCommand(getCommand(null), explainVerbosity),
- new BsonDocumentCodec());
+ return new CommandReadOperation<>(clientSideOperationTimeout, namespace.getDatabaseName(),
+ ExplainHelper.asExplainCommand(getCommand(null), explainVerbosity),
+ new BsonDocumentCodec());
}
private CommandWriteTransformer