diff --git a/driver-core/src/main/com/mongodb/assertions/Assertions.java b/driver-core/src/main/com/mongodb/assertions/Assertions.java index 19f788b3ad2..6242f057417 100644 --- a/driver-core/src/main/com/mongodb/assertions/Assertions.java +++ b/driver-core/src/main/com/mongodb/assertions/Assertions.java @@ -25,7 +25,7 @@ import java.util.function.Supplier; /** - *

Design by contract assertions.

This class is not part of the public API and may be removed or changed at any time.

+ *

Design by contract assertions.

* All {@code assert...} methods throw {@link AssertionError} and should be used to check conditions which may be violated if and only if * the driver code is incorrect. The intended usage of this methods is the same as of the * Java {@code assert} statement. The reason diff --git a/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java b/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java index cd1c1c94b11..3afb97c512a 100644 --- a/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java +++ b/driver-core/src/main/com/mongodb/internal/async/function/RetryState.java @@ -203,7 +203,7 @@ private void doAdvanceOrThrow(final Throwable attemptException, */ if (hasTimeoutMs() && !loopState.isLastIteration()) { previouslyChosenException = createMongoTimeoutException( - "MongoDB operation timed out during a retry attempt", + "Retry attempt timed out.", previouslyChosenException); } throw previouslyChosenException; diff --git a/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java index 26565b6ea8c..bbd7ce7300e 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AbortTransactionOperation.java @@ -18,6 +18,7 @@ import com.mongodb.Function; import com.mongodb.WriteConcern; +import com.mongodb.internal.TimeoutContext; import com.mongodb.lang.Nullable; import org.bson.BsonDocument; @@ -58,7 +59,7 @@ CommandCreator getCommandCreator() { } @Override - protected Function getRetryCommandModifier() { + protected Function getRetryCommandModifier(final TimeoutContext timeoutContext) { return cmd -> cmd; } } diff --git a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java index 5f6e3c9fbb9..77434bd9781 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java +++ b/driver-core/src/main/com/mongodb/internal/operation/AsyncOperations.java @@ -74,7 +74,11 @@ public final class AsyncOperations { public AsyncOperations(final MongoNamespace namespace, final Class documentClass, final ReadPreference readPreference, final CodecRegistry codecRegistry, final ReadConcern readConcern, final WriteConcern writeConcern, final boolean retryWrites, final boolean retryReads, final TimeoutSettings timeoutSettings) { - this.operations = new Operations<>(namespace, documentClass, readPreference, codecRegistry, readConcern, writeConcern, + WriteConcern writeConcernToUse = writeConcern; + if (timeoutSettings.getTimeoutMS() != null) { + writeConcernToUse = assertNotNull(WriteConcernHelper.cloneWithoutTimeout(writeConcern)); + } + this.operations = new Operations<>(namespace, documentClass, readPreference, codecRegistry, readConcern, writeConcernToUse, retryWrites, retryReads); this.timeoutSettings = timeoutSettings; } diff --git a/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java index 7384241474f..c9c19029119 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/CommitTransactionOperation.java @@ -25,6 +25,7 @@ import com.mongodb.MongoTimeoutException; import com.mongodb.MongoWriteConcernException; import com.mongodb.WriteConcern; +import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncWriteBinding; import com.mongodb.internal.binding.WriteBinding; @@ -125,7 +126,8 @@ CommandCreator getCommandCreator() { }; if (alreadyCommitted) { return (operationContext, serverDescription, connectionDescription) -> - getRetryCommandModifier().apply(creator.create(operationContext, serverDescription, connectionDescription)); + getRetryCommandModifier(operationContext.getTimeoutContext()) + .apply(creator.create(operationContext, serverDescription, connectionDescription)); } else if (recoveryToken != null) { return (operationContext, serverDescription, connectionDescription) -> creator.create(operationContext, serverDescription, connectionDescription) @@ -136,10 +138,10 @@ CommandCreator getCommandCreator() { @Override @SuppressWarnings("deprecation") //wTimeout - protected Function getRetryCommandModifier() { + protected Function getRetryCommandModifier(final TimeoutContext timeoutContext) { return command -> { WriteConcern retryWriteConcern = getWriteConcern().withW("majority"); - if (retryWriteConcern.getWTimeout(MILLISECONDS) == null) { + if (retryWriteConcern.getWTimeout(MILLISECONDS) == null && !timeoutContext.hasTimeoutMS()) { retryWriteConcern = retryWriteConcern.withWTimeout(10000, MILLISECONDS); } command.put("writeConcern", retryWriteConcern.asDocument()); diff --git a/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java b/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java index aa0bf509484..73a83310d65 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java +++ b/driver-core/src/main/com/mongodb/internal/operation/SyncOperations.java @@ -60,6 +60,7 @@ import java.util.List; +import static com.mongodb.assertions.Assertions.assertNotNull; import static java.util.concurrent.TimeUnit.MILLISECONDS; /** @@ -82,7 +83,11 @@ public SyncOperations(final MongoNamespace namespace, final Class doc public SyncOperations(@Nullable final MongoNamespace namespace, final Class documentClass, final ReadPreference readPreference, final CodecRegistry codecRegistry, final ReadConcern readConcern, final WriteConcern writeConcern, final boolean retryWrites, final boolean retryReads, final TimeoutSettings timeoutSettings) { - this.operations = new Operations<>(namespace, documentClass, readPreference, codecRegistry, readConcern, writeConcern, + WriteConcern writeConcernToUse = writeConcern; + if (timeoutSettings.getTimeoutMS() != null) { + writeConcernToUse = assertNotNull(WriteConcernHelper.cloneWithoutTimeout(writeConcern)); + } + this.operations = new Operations<>(namespace, documentClass, readPreference, codecRegistry, readConcern, writeConcernToUse, retryWrites, retryReads); this.timeoutSettings = timeoutSettings; } diff --git a/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java b/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java index d6c564436c3..3bb04efa8ed 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java +++ b/driver-core/src/main/com/mongodb/internal/operation/TransactionOperation.java @@ -18,6 +18,7 @@ import com.mongodb.Function; import com.mongodb.WriteConcern; +import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.binding.AsyncWriteBinding; import com.mongodb.internal.binding.WriteBinding; @@ -55,17 +56,19 @@ public WriteConcern getWriteConcern() { @Override public Void execute(final WriteBinding binding) { isTrue("in transaction", binding.getOperationContext().getSessionContext().hasActiveTransaction()); + TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext(); return executeRetryableWrite(binding, "admin", null, new NoOpFieldNameValidator(), new BsonDocumentCodec(), getCommandCreator(), - writeConcernErrorTransformer(binding.getOperationContext().getTimeoutContext()), getRetryCommandModifier()); + writeConcernErrorTransformer(timeoutContext), getRetryCommandModifier(timeoutContext)); } @Override public void executeAsync(final AsyncWriteBinding binding, final SingleResultCallback callback) { isTrue("in transaction", binding.getOperationContext().getSessionContext().hasActiveTransaction()); + TimeoutContext timeoutContext = binding.getOperationContext().getTimeoutContext(); executeRetryableWriteAsync(binding, "admin", null, new NoOpFieldNameValidator(), new BsonDocumentCodec(), getCommandCreator(), - writeConcernErrorTransformerAsync(binding.getOperationContext().getTimeoutContext()), getRetryCommandModifier(), + writeConcernErrorTransformerAsync(timeoutContext), getRetryCommandModifier(timeoutContext), errorHandlingCallback(callback, LOGGER)); } @@ -86,5 +89,5 @@ CommandCreator getCommandCreator() { */ protected abstract String getCommandName(); - protected abstract Function getRetryCommandModifier(); + protected abstract Function getRetryCommandModifier(TimeoutContext timeoutContext); } diff --git a/driver-core/src/main/com/mongodb/internal/operation/WriteConcernHelper.java b/driver-core/src/main/com/mongodb/internal/operation/WriteConcernHelper.java index 837c206cdf7..10b02eda4fe 100644 --- a/driver-core/src/main/com/mongodb/internal/operation/WriteConcernHelper.java +++ b/driver-core/src/main/com/mongodb/internal/operation/WriteConcernHelper.java @@ -24,10 +24,12 @@ import com.mongodb.bulk.WriteConcernError; import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.connection.ProtocolHelper; +import com.mongodb.lang.Nullable; import org.bson.BsonArray; import org.bson.BsonDocument; import org.bson.BsonString; +import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; import static com.mongodb.internal.operation.CommandOperationHelper.addRetryableWriteErrorLabel; @@ -42,6 +44,21 @@ public static void appendWriteConcernToCommand(final WriteConcern writeConcern, commandDocument.put("writeConcern", writeConcern.asDocument()); } } + @Nullable + public static WriteConcern cloneWithoutTimeout(@Nullable final WriteConcern writeConcern) { + if (writeConcern == null || writeConcern.getWTimeout(TimeUnit.MILLISECONDS) == null) { + return writeConcern; + } + + WriteConcern mapped; + Object w = writeConcern.getWObject(); + if (w == null) { + mapped = WriteConcern.ACKNOWLEDGED; + } else { + mapped = w instanceof Integer ? new WriteConcern((Integer) w) : new WriteConcern((String) w); + } + return mapped.withJournal(writeConcern.getJournal()); + } public static void throwOnWriteConcernError(final BsonDocument result, final ServerAddress serverAddress, final int maxWireVersion, final TimeoutContext timeoutContext) { diff --git a/driver-core/src/main/com/mongodb/internal/session/BaseClientSessionImpl.java b/driver-core/src/main/com/mongodb/internal/session/BaseClientSessionImpl.java index f35a41cc838..e89e5589901 100644 --- a/driver-core/src/main/com/mongodb/internal/session/BaseClientSessionImpl.java +++ b/driver-core/src/main/com/mongodb/internal/session/BaseClientSessionImpl.java @@ -20,6 +20,7 @@ import com.mongodb.MongoClientException; import com.mongodb.ServerAddress; import com.mongodb.TransactionOptions; +import com.mongodb.WriteConcern; import com.mongodb.internal.TimeoutContext; import com.mongodb.internal.TimeoutSettings; import com.mongodb.internal.binding.ReferenceCounted; @@ -29,6 +30,7 @@ import org.bson.BsonDocument; import org.bson.BsonTimestamp; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import static com.mongodb.assertions.Assertions.assertTrue; @@ -55,6 +57,14 @@ public class BaseClientSessionImpl implements ClientSession { @Nullable private TimeoutContext timeoutContext; + protected static boolean hasTimeoutMS(@Nullable final TimeoutContext timeoutContext) { + return timeoutContext != null && timeoutContext.hasTimeoutMS(); + } + + protected static boolean hasWTimeoutMS(@Nullable final WriteConcern writeConcern) { + return writeConcern != null && writeConcern.getWTimeout(TimeUnit.MILLISECONDS) != null; + } + public BaseClientSessionImpl(final ServerSessionPool serverSessionPool, final Object originator, final ClientSessionOptions options) { this.serverSessionPool = serverSessionPool; this.originator = originator; @@ -228,4 +238,8 @@ protected TimeoutSettings getTimeoutSettings(final TransactionOptions transactio .withMaxCommitMS(transactionOptions.getMaxCommitTime(MILLISECONDS)) .withTimeout(timeoutMS, MILLISECONDS); } + + protected enum TransactionState { + NONE, IN, COMMITTED, ABORTED + } } diff --git a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java index c246a75cfcf..497cd3e7247 100644 --- a/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java +++ b/driver-core/src/test/functional/com/mongodb/internal/connection/TestCommandListener.java @@ -41,6 +41,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; +import java.util.function.Predicate; import java.util.stream.Collectors; import static com.mongodb.ClusterFixture.TIMEOUT; @@ -152,15 +153,28 @@ public List getCommandStartedEvents() { return getEvents(CommandStartedEvent.class, Integer.MAX_VALUE); } + public List getCommandStartedEvents(final String commandName) { + return getEvents(CommandStartedEvent.class, + commandEvent -> commandEvent.getCommandName().equals(commandName), + Integer.MAX_VALUE); + } + public List getCommandSucceededEvents() { return getEvents(CommandSucceededEvent.class, Integer.MAX_VALUE); } private List getEvents(final Class type, final int maxEvents) { + return getEvents(type, e -> true, maxEvents); + } + + private List getEvents(final Class type, + final Predicate filter, + final int maxEvents) { lock.lock(); try { return getEvents().stream() .filter(e -> e.getClass() == type) + .filter(filter) .map(type::cast) .limit(maxEvents).collect(Collectors.toList()); } finally { diff --git a/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/deprecated-options.json b/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/deprecated-options.json index 41b084c8cc5..2ecba25f0d3 100644 --- a/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/deprecated-options.json +++ b/driver-core/src/test/resources/unified-test-format/client-side-operation-timeout/deprecated-options.json @@ -141,6 +141,7 @@ }, { "description": "commitTransaction ignores wTimeoutMS if timeoutMS is set", + "comment": "Moved timeoutMS from commitTransaction to startTransaction manually, as commitTransaction does not support a timeoutMS option.", "operations": [ { "name": "createEntities", @@ -186,7 +187,10 @@ }, { "name": "startTransaction", - "object": "session" + "object": "session", + "arguments": { + "timeoutMS": 10000 + } }, { "name": "countDocuments", @@ -198,10 +202,7 @@ }, { "name": "commitTransaction", - "object": "session", - "arguments": { - "timeoutMS": 10000 - } + "object": "session" } ], "expectEvents": [ @@ -430,6 +431,7 @@ }, { "description": "abortTransaction ignores wTimeoutMS if timeoutMS is set", + "comment": "Moved timeoutMS from abortTransaction to startTransaction manually, as abortTransaction does not support a timeoutMS option.", "operations": [ { "name": "createEntities", @@ -475,7 +477,10 @@ }, { "name": "startTransaction", - "object": "session" + "object": "session", + "arguments": { + "timeoutMS": 10000 + } }, { "name": "countDocuments", @@ -487,10 +492,7 @@ }, { "name": "abortTransaction", - "object": "session", - "arguments": { - "timeoutMS": 10000 - } + "object": "session" } ], "expectEvents": [ diff --git a/driver-core/src/test/unit/com/mongodb/internal/async/function/RetryStateTest.java b/driver-core/src/test/unit/com/mongodb/internal/async/function/RetryStateTest.java index 3f15d7c92bb..1d1a76797de 100644 --- a/driver-core/src/test/unit/com/mongodb/internal/async/function/RetryStateTest.java +++ b/driver-core/src/test/unit/com/mongodb/internal/async/function/RetryStateTest.java @@ -50,7 +50,7 @@ final class RetryStateTest { private static final TimeoutContext TIMEOUT_CONTEXT_INFINITE_GLOBAL_TIMEOUT = new TimeoutContext(new TimeoutSettings(0L, 0L, 0L, 0L, 0L)); - private static final String EXPECTED_TIMEOUT_MESSAGE = "MongoDB operation timed out during a retry attempt"; + private static final String EXPECTED_TIMEOUT_MESSAGE = "Retry attempt timed out."; static Stream infiniteTimeout() { return Stream.of( diff --git a/driver-core/src/test/unit/com/mongodb/internal/operation/WriteConcernHelperTest.java b/driver-core/src/test/unit/com/mongodb/internal/operation/WriteConcernHelperTest.java new file mode 100644 index 00000000000..2c7b71949c8 --- /dev/null +++ b/driver-core/src/test/unit/com/mongodb/internal/operation/WriteConcernHelperTest.java @@ -0,0 +1,61 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal.operation; + +import com.mongodb.WriteConcern; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.MethodSource; + +import java.util.concurrent.TimeUnit; + +import static com.mongodb.assertions.Assertions.assertNull; +import static org.junit.jupiter.api.Assertions.assertEquals; + +class WriteConcernHelperTest { + + static WriteConcern[] shouldRemoveWtimeout(){ + return new WriteConcern[]{ + WriteConcern.ACKNOWLEDGED, + WriteConcern.MAJORITY, + WriteConcern.W1, + WriteConcern.W2, + WriteConcern.W3, + WriteConcern.UNACKNOWLEDGED, + WriteConcern.JOURNALED, + + WriteConcern.ACKNOWLEDGED.withWTimeout(100, TimeUnit.MILLISECONDS), + WriteConcern.MAJORITY.withWTimeout(100, TimeUnit.MILLISECONDS), + WriteConcern.W1.withWTimeout(100, TimeUnit.MILLISECONDS), + WriteConcern.W2.withWTimeout(100, TimeUnit.MILLISECONDS), + WriteConcern.W3.withWTimeout(100, TimeUnit.MILLISECONDS), + WriteConcern.UNACKNOWLEDGED.withWTimeout(100, TimeUnit.MILLISECONDS), + WriteConcern.JOURNALED.withWTimeout(100, TimeUnit.MILLISECONDS), + }; + } + + @MethodSource + @ParameterizedTest + void shouldRemoveWtimeout(final WriteConcern writeConcern){ + //when + WriteConcern clonedWithoutTimeout = WriteConcernHelper.cloneWithoutTimeout(writeConcern); + + //then + assertEquals(writeConcern.getWObject(), clonedWithoutTimeout.getWObject()); + assertEquals(writeConcern.getJournal(), clonedWithoutTimeout.getJournal()); + assertNull(clonedWithoutTimeout.getWTimeout(TimeUnit.MILLISECONDS)); + } +} diff --git a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java index a39a916c4fa..fceee433ef6 100644 --- a/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java +++ b/driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionPublisherImpl.java @@ -28,8 +28,10 @@ import com.mongodb.internal.operation.AsyncReadOperation; import com.mongodb.internal.operation.AsyncWriteOperation; import com.mongodb.internal.operation.CommitTransactionOperation; +import com.mongodb.internal.operation.WriteConcernHelper; import com.mongodb.internal.session.BaseClientSessionImpl; import com.mongodb.internal.session.ServerSessionPool; +import com.mongodb.lang.Nullable; import com.mongodb.reactivestreams.client.ClientSession; import org.reactivestreams.Publisher; import reactor.core.publisher.Mono; @@ -102,6 +104,7 @@ public void startTransaction() { @Override public void startTransaction(final TransactionOptions transactionOptions) { notNull("transactionOptions", transactionOptions); + Boolean snapshot = getOptions().isSnapshot(); if (snapshot != null && snapshot) { throw new IllegalArgumentException("Transactions are not supported in snapshot sessions"); @@ -116,7 +119,9 @@ public void startTransaction(final TransactionOptions transactionOptions) { } getServerSession().advanceTransactionNumber(); this.transactionOptions = TransactionOptions.merge(transactionOptions, getOptions().getDefaultTransactionOptions()); - WriteConcern writeConcern = this.transactionOptions.getWriteConcern(); + + TimeoutContext timeoutContext = createTimeoutContext(); + WriteConcern writeConcern = getWriteConcern(timeoutContext); if (writeConcern == null) { throw new MongoInternalException("Invariant violated. Transaction options write concern can not be null"); } @@ -124,7 +129,16 @@ public void startTransaction(final TransactionOptions transactionOptions) { throw new MongoClientException("Transactions do not support unacknowledged write concern"); } clearTransactionContext(); - setTimeoutContext(createTimeoutContext()); + setTimeoutContext(timeoutContext); + } + + @Nullable + private WriteConcern getWriteConcern(@Nullable final TimeoutContext timeoutContext) { + WriteConcern writeConcern = transactionOptions.getWriteConcern(); + if (hasTimeoutMS(timeoutContext) && hasWTimeoutMS(writeConcern)) { + return WriteConcernHelper.cloneWithoutTimeout(writeConcern); + } + return writeConcern; } @Override @@ -146,9 +160,11 @@ public Publisher commitTransaction() { boolean alreadyCommitted = commitInProgress || transactionState == TransactionState.COMMITTED; commitInProgress = true; resetTimeout(); + TimeoutContext timeoutContext = getTimeoutContext(); + WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext)); return executor .execute( - new CommitTransactionOperation(assertNotNull(transactionOptions.getWriteConcern()), alreadyCommitted) + new CommitTransactionOperation(writeConcern, alreadyCommitted) .recoveryToken(getRecoveryToken()), readConcern, this) .doOnSuccess(ignored -> setTimeoutContext(null)) .doOnTerminate(() -> { @@ -180,8 +196,10 @@ public Publisher abortTransaction() { } resetTimeout(); + TimeoutContext timeoutContext = getTimeoutContext(); + WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext)); return executor - .execute(new AbortTransactionOperation(assertNotNull(transactionOptions.getWriteConcern())) + .execute(new AbortTransactionOperation(writeConcern) .recoveryToken(getRecoveryToken()), readConcern, this) .onErrorResume(Throwable.class, (e) -> Mono.empty()) .doOnTerminate(() -> { @@ -213,10 +231,6 @@ private void cleanupTransaction(final TransactionState nextState) { setTimeoutContext(null); } - private enum TransactionState { - NONE, IN, COMMITTED, ABORTED - } - private TimeoutContext createTimeoutContext() { return new TimeoutContext(getTimeoutSettings(transactionOptions, executor.getTimeoutSettings())); } diff --git a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java index f62a5238dbf..1efa6d20002 100644 --- a/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java +++ b/driver-sync/src/main/com/mongodb/client/internal/ClientSessionImpl.java @@ -31,9 +31,11 @@ import com.mongodb.internal.operation.AbortTransactionOperation; import com.mongodb.internal.operation.CommitTransactionOperation; import com.mongodb.internal.operation.ReadOperation; +import com.mongodb.internal.operation.WriteConcernHelper; import com.mongodb.internal.operation.WriteOperation; import com.mongodb.internal.session.BaseClientSessionImpl; import com.mongodb.internal.session.ServerSessionPool; +import com.mongodb.lang.Nullable; import static com.mongodb.MongoException.TRANSIENT_TRANSACTION_ERROR_LABEL; import static com.mongodb.MongoException.UNKNOWN_TRANSACTION_COMMIT_RESULT_LABEL; @@ -44,10 +46,6 @@ final class ClientSessionImpl extends BaseClientSessionImpl implements ClientSession { - private enum TransactionState { - NONE, IN, COMMITTED, ABORTED - } - private static final int MAX_RETRY_TIME_LIMIT_MS = 120000; private final OperationExecutor operationExecutor; @@ -131,8 +129,10 @@ public void abortTransaction() { throw new MongoInternalException("Invariant violated. Transaction options read concern can not be null"); } resetTimeout(); + TimeoutContext timeoutContext = getTimeoutContext(); + WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext)); operationExecutor - .execute(new AbortTransactionOperation(assertNotNull(transactionOptions.getWriteConcern())) + .execute(new AbortTransactionOperation(writeConcern) .recoveryToken(getRecoveryToken()), readConcern, this); } } catch (RuntimeException e) { @@ -159,7 +159,7 @@ private void startTransaction(final TransactionOptions transactionOptions, final } getServerSession().advanceTransactionNumber(); this.transactionOptions = TransactionOptions.merge(transactionOptions, getOptions().getDefaultTransactionOptions()); - WriteConcern writeConcern = this.transactionOptions.getWriteConcern(); + WriteConcern writeConcern = getWriteConcern(timeoutContext); if (writeConcern == null) { throw new MongoInternalException("Invariant violated. Transaction options write concern can not be null"); } @@ -170,6 +170,15 @@ private void startTransaction(final TransactionOptions transactionOptions, final setTimeoutContext(timeoutContext); } + @Nullable + private WriteConcern getWriteConcern(@Nullable final TimeoutContext timeoutContext) { + WriteConcern writeConcern = transactionOptions.getWriteConcern(); + if (hasTimeoutMS(timeoutContext) && hasWTimeoutMS(writeConcern)) { + return WriteConcernHelper.cloneWithoutTimeout(writeConcern); + } + return writeConcern; + } + private void commitTransaction(final boolean resetTimeout) { if (transactionState == TransactionState.ABORTED) { throw new IllegalStateException("Cannot call commitTransaction after calling abortTransaction"); @@ -188,8 +197,10 @@ private void commitTransaction(final boolean resetTimeout) { if (resetTimeout) { resetTimeout(); } + TimeoutContext timeoutContext = getTimeoutContext(); + WriteConcern writeConcern = assertNotNull(getWriteConcern(timeoutContext)); operationExecutor - .execute(new CommitTransactionOperation(assertNotNull(transactionOptions.getWriteConcern()), + .execute(new CommitTransactionOperation(writeConcern, transactionState == TransactionState.COMMITTED) .recoveryToken(getRecoveryToken()), readConcern, this); setTimeoutContext(null); @@ -279,7 +290,8 @@ public void close() { // Apply majority write concern if the commit is to be retried. private void applyMajorityWriteConcernToTransactionOptions() { if (transactionOptions != null) { - WriteConcern writeConcern = transactionOptions.getWriteConcern(); + TimeoutContext timeoutContext = getTimeoutContext(); + WriteConcern writeConcern = getWriteConcern(timeoutContext); if (writeConcern != null) { transactionOptions = TransactionOptions.merge(TransactionOptions.builder() .writeConcern(writeConcern.withW("majority")).build(), transactionOptions); diff --git a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java index 17c4b9624cf..428c779f6cd 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/AbstractClientSideOperationsTimeoutProseTest.java @@ -27,6 +27,7 @@ import com.mongodb.MongoTimeoutException; import com.mongodb.ReadConcern; import com.mongodb.ReadPreference; +import com.mongodb.TransactionOptions; import com.mongodb.WriteConcern; import com.mongodb.client.gridfs.GridFSBucket; import com.mongodb.client.gridfs.GridFSDownloadStream; @@ -55,6 +56,7 @@ import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.DisplayName; import org.junit.jupiter.api.Named; import org.junit.jupiter.api.Tag; @@ -84,6 +86,7 @@ import static java.util.Collections.singletonList; import static org.junit.jupiter.api.Assertions.assertDoesNotThrow; import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertNotNull; import static org.junit.jupiter.api.Assertions.assertNull; import static org.junit.jupiter.api.Assertions.assertThrows; @@ -704,6 +707,40 @@ public void test10CustomTestWithTransactionUsesASingleTimeoutWithLock() { } } + /** + * Not a prose spec test. However, it is additional test case for better coverage. + */ + @Tag("setsFailPoint") + @Test + @DisplayName("Should ignore wTimeoutMS of WriteConcern to initial and subsequent commitTransaction operations") + @Disabled //TODO JAVA-5425 + public void shouldIgnoreWtimeoutMsOfWriteConcernToInitialAndSubsequentCommitTransactionOperations() { + assumeTrue(serverVersionAtLeast(4, 4)); + assumeFalse(isStandalone()); + + try (MongoClient mongoClient = createMongoClient(getMongoClientSettingsBuilder())) { + MongoCollection collection = mongoClient.getDatabase(namespace.getDatabaseName()) + .getCollection(namespace.getCollectionName()); + + try (ClientSession session = mongoClient.startSession(ClientSessionOptions.builder() + .defaultTimeout(200, TimeUnit.MILLISECONDS) + .build())) { + session.startTransaction(TransactionOptions.builder() + .writeConcern(WriteConcern.ACKNOWLEDGED.withWTimeout(100, TimeUnit.MILLISECONDS)) + .build()); + collection.insertOne(session, new Document("x", 1)); + sleep(200); + + assertDoesNotThrow(session::commitTransaction); + //repeat commit. + assertDoesNotThrow(session::commitTransaction); + } + } + List commandStartedEvents = commandListener.getCommandStartedEvents("commitTransaction"); + assertEquals(2, commandStartedEvents.size()); + commandStartedEvents.forEach(e -> assertFalse(e.getCommand().containsKey("writeConcern"))); + } + private static Stream test8ServerSelectionArguments() { return Stream.of( Arguments.of(Named.of("serverSelectionTimeoutMS honored if timeoutMS is not set", diff --git a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java index ac9c3d3e0d9..e039eed41d4 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java +++ b/driver-sync/src/test/functional/com/mongodb/client/ClientSideOperationTimeoutTest.java @@ -98,8 +98,6 @@ public static void checkSkipCSOTTest(final String fileDescription, final String assumeFalse("TODO (CSOT) - JAVA-4052", fileDescription.startsWith("timeoutMS behaves correctly for retryable operations")); assumeFalse("TODO (CSOT) - JAVA-4052", fileDescription.startsWith("legacy timeouts behave correctly for retryable operations")); - - assumeFalse("TODO (CSOT) - JAVA-4062", testDescription.contains("wTimeoutMS is ignored") || testDescription.contains("ignores wTimeoutMS")); } diff --git a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java index 4c2e1042268..04d7b36d941 100644 --- a/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java +++ b/driver-sync/src/test/functional/com/mongodb/client/unified/UnifiedCrudHelper.java @@ -1211,6 +1211,9 @@ OperationResult executeStartTransaction(final BsonDocument operation) { case "readConcern": optionsBuilder.readConcern(asReadConcern(cur.getValue().asDocument())); break; + case "timeoutMS": + optionsBuilder.timeout(cur.getValue().asInt32().longValue(), TimeUnit.MILLISECONDS); + break; case "maxCommitTimeMS": optionsBuilder.maxCommitTime(cur.getValue().asNumber().longValue(), TimeUnit.MILLISECONDS); break; diff --git a/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSBucketsSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSBucketsSpecification.groovy index 7dc948a6ab2..0064cc9aad8 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSBucketsSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/gridfs/GridFSBucketsSpecification.groovy @@ -16,6 +16,7 @@ package com.mongodb.client.gridfs +import com.mongodb.ClusterFixture import com.mongodb.ReadConcern import com.mongodb.ReadPreference import com.mongodb.WriteConcern @@ -35,7 +36,7 @@ class GridFSBucketsSpecification extends Specification { def 'should create a GridFSBucket with default bucket name'() { given: def database = new MongoDatabaseImpl('db', Stub(CodecRegistry), Stub(ReadPreference), Stub(WriteConcern), false, true, readConcern, - JAVA_LEGACY, null, null, Stub(OperationExecutor)) + JAVA_LEGACY, null, ClusterFixture.TIMEOUT_SETTINGS, Stub(OperationExecutor)) when: def gridFSBucket = GridFSBuckets.create(database) @@ -48,7 +49,7 @@ class GridFSBucketsSpecification extends Specification { def 'should create a GridFSBucket with custom bucket name'() { given: def database = new MongoDatabaseImpl('db', Stub(CodecRegistry), Stub(ReadPreference), Stub(WriteConcern), false, true, readConcern, - JAVA_LEGACY, null, null, Stub(OperationExecutor)) + JAVA_LEGACY, null, ClusterFixture.TIMEOUT_SETTINGS, Stub(OperationExecutor)) def customName = 'custom' when: diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/MongoCollectionSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/MongoCollectionSpecification.groovy index 4210e6dd4bb..2fba3b90a0a 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/MongoCollectionSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/MongoCollectionSpecification.groovy @@ -1476,7 +1476,8 @@ class MongoCollectionSpecification extends Specification { def 'should validate the client session correctly'() { given: def collection = new MongoCollectionImpl(namespace, Document, codecRegistry, readPreference, ACKNOWLEDGED, - true, true, readConcern, JAVA_LEGACY, null, null, Stub(OperationExecutor)) + true, true, readConcern, JAVA_LEGACY, null, TIMEOUT_SETTINGS, + Stub(OperationExecutor)) when: collection.aggregate(null, [Document.parse('{$match:{}}')]) diff --git a/driver-sync/src/test/unit/com/mongodb/client/internal/MongoDatabaseSpecification.groovy b/driver-sync/src/test/unit/com/mongodb/client/internal/MongoDatabaseSpecification.groovy index a3707677b1d..e702dd5e276 100644 --- a/driver-sync/src/test/unit/com/mongodb/client/internal/MongoDatabaseSpecification.groovy +++ b/driver-sync/src/test/unit/com/mongodb/client/internal/MongoDatabaseSpecification.groovy @@ -498,7 +498,7 @@ class MongoDatabaseSpecification extends Specification { given: def codecRegistry = fromProviders([new ValueCodecProvider(), new DocumentCodecProvider(), new BsonValueCodecProvider()]) def database = new MongoDatabaseImpl('databaseName', codecRegistry, secondary(), WriteConcern.MAJORITY, true, true, - ReadConcern.MAJORITY, JAVA_LEGACY, null, null, new TestOperationExecutor([])) + ReadConcern.MAJORITY, JAVA_LEGACY, null, TIMEOUT_SETTINGS, new TestOperationExecutor([])) when: def collection = database.getCollection('collectionName') @@ -509,7 +509,7 @@ class MongoDatabaseSpecification extends Specification { where: expectedCollection = new MongoCollectionImpl(new MongoNamespace('databaseName', 'collectionName'), Document, fromProviders([new ValueCodecProvider(), new DocumentCodecProvider(), new BsonValueCodecProvider()]), secondary(), - WriteConcern.MAJORITY, true, true, ReadConcern.MAJORITY, JAVA_LEGACY, null, null, + WriteConcern.MAJORITY, true, true, ReadConcern.MAJORITY, JAVA_LEGACY, null, TIMEOUT_SETTINGS, new TestOperationExecutor([])) }