diff --git a/driver-core/src/main/com/mongodb/KerberosSubjectProvider.java b/driver-core/src/main/com/mongodb/KerberosSubjectProvider.java index 0a78fc69602..f5d5beb5440 100644 --- a/driver-core/src/main/com/mongodb/KerberosSubjectProvider.java +++ b/driver-core/src/main/com/mongodb/KerberosSubjectProvider.java @@ -26,8 +26,10 @@ import javax.security.auth.kerberos.KerberosTicket; import javax.security.auth.login.LoginContext; import javax.security.auth.login.LoginException; +import java.util.concurrent.locks.ReentrantLock; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.Locks.checkedWithLock; import static java.lang.String.format; import static java.util.concurrent.TimeUnit.MILLISECONDS; import static java.util.concurrent.TimeUnit.MINUTES; @@ -54,6 +56,7 @@ public class KerberosSubjectProvider implements SubjectProvider { private static final Logger LOGGER = Loggers.getLogger("authenticator"); private static final String TGT_PREFIX = "krbtgt/"; + private final ReentrantLock lock = new ReentrantLock(); private String loginContextName; private String fallbackLoginContextName; private Subject subject; @@ -87,11 +90,13 @@ private KerberosSubjectProvider(final String loginContextName, @Nullable final S * @throws LoginException any exception resulting from a call to {@link LoginContext#login()} */ @NonNull - public synchronized Subject getSubject() throws LoginException { - if (subject == null || needNewSubject(subject)) { - subject = createNewSubject(); - } - return subject; + public Subject getSubject() throws LoginException { + return checkedWithLock(lock, () -> { + if (subject == null || needNewSubject(subject)) { + subject = createNewSubject(); + } + return subject; + }); } private Subject createNewSubject() throws LoginException { diff --git a/driver-core/src/main/com/mongodb/internal/CheckedSupplier.java b/driver-core/src/main/com/mongodb/internal/CheckedSupplier.java new file mode 100644 index 00000000000..3c554d80004 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/CheckedSupplier.java @@ -0,0 +1,32 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal; + +/** + * This class is not part of the public API and may be removed or changed at any time. + */ +@FunctionalInterface +public interface CheckedSupplier { + + /** + * Gets a result. + * + * @return a result + * @throws E the checked exception to throw + */ + T get() throws E; +} diff --git a/driver-core/src/main/com/mongodb/internal/Locks.java b/driver-core/src/main/com/mongodb/internal/Locks.java new file mode 100644 index 00000000000..6c9f10e1d17 --- /dev/null +++ b/driver-core/src/main/com/mongodb/internal/Locks.java @@ -0,0 +1,55 @@ +/* + * Copyright 2008-present MongoDB, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.mongodb.internal; + +import com.mongodb.MongoInterruptedException; + +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; + +/** + * This class is not part of the public API and may be removed or changed at any time. + */ +public final class Locks { + public static void withLock(final Lock lock, final Runnable action) { + withLock(lock, () -> { + action.run(); + return null; + }); + } + + public static V withLock(final Lock lock, final Supplier supplier) { + return checkedWithLock(lock, supplier::get); + } + + public static V checkedWithLock(final Lock lock, final CheckedSupplier supplier) throws E { + try { + lock.lockInterruptibly(); + try { + return supplier.get(); + } finally { + lock.unlock(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new MongoInterruptedException("Interrupted waiting for lock", e); + } + } + + private Locks() { + } +} diff --git a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java index 44255543eb1..16638df9dc5 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java +++ b/driver-core/src/main/com/mongodb/internal/connection/BaseCluster.java @@ -32,6 +32,7 @@ import com.mongodb.event.ClusterDescriptionChangedEvent; import com.mongodb.event.ClusterListener; import com.mongodb.event.ClusterOpeningEvent; +import com.mongodb.internal.Locks; import com.mongodb.internal.VisibleForTesting; import com.mongodb.internal.async.SingleResultCallback; import com.mongodb.internal.selector.LatencyMinimizingServerSelector; @@ -48,6 +49,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicReference; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Function; import static com.mongodb.assertions.Assertions.isTrue; @@ -68,6 +70,7 @@ abstract class BaseCluster implements Cluster { private static final Logger LOGGER = Loggers.getLogger("cluster"); + private final ReentrantLock lock = new ReentrantLock(); private final AtomicReference phase = new AtomicReference(new CountDownLatch(1)); private final ClusterableServerFactory serverFactory; private final ClusterId clusterId; @@ -268,8 +271,8 @@ public ClusterDescription getCurrentDescription() { } @Override - public synchronized void withLock(final Runnable action) { - action.run(); + public void withLock(final Runnable action) { + Locks.withLock(lock, action); } private void updatePhase() { diff --git a/driver-core/src/main/com/mongodb/internal/connection/ClusterClock.java b/driver-core/src/main/com/mongodb/internal/connection/ClusterClock.java index 7886a646f84..1c4726b9708 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/ClusterClock.java +++ b/driver-core/src/main/com/mongodb/internal/connection/ClusterClock.java @@ -19,29 +19,36 @@ import org.bson.BsonDocument; import org.bson.BsonTimestamp; +import java.util.concurrent.locks.ReentrantLock; + +import static com.mongodb.internal.Locks.withLock; + public class ClusterClock { private static final String CLUSTER_TIME_KEY = "clusterTime"; + private final ReentrantLock lock = new ReentrantLock(); private BsonDocument clusterTime; - public synchronized BsonDocument getCurrent() { - return clusterTime; + public BsonDocument getCurrent() { + return withLock(lock, () -> clusterTime); } - public synchronized BsonTimestamp getClusterTime() { - return clusterTime != null ? clusterTime.getTimestamp(CLUSTER_TIME_KEY) : null; + public BsonTimestamp getClusterTime() { + return withLock(lock, () -> clusterTime != null ? clusterTime.getTimestamp(CLUSTER_TIME_KEY) : null); } - public synchronized void advance(final BsonDocument other) { - this.clusterTime = greaterOf(other); + public void advance(final BsonDocument other) { + withLock(lock, () -> this.clusterTime = greaterOf(other)); } - public synchronized BsonDocument greaterOf(final BsonDocument other) { - if (other == null) { - return clusterTime; - } else if (clusterTime == null) { - return other; - } else { - return other.getTimestamp(CLUSTER_TIME_KEY).compareTo(clusterTime.getTimestamp(CLUSTER_TIME_KEY)) > 0 ? other : clusterTime; - } + public BsonDocument greaterOf(final BsonDocument other) { + return withLock(lock, () -> { + if (other == null) { + return clusterTime; + } else if (clusterTime == null) { + return other; + } else { + return other.getTimestamp(CLUSTER_TIME_KEY).compareTo(clusterTime.getTimestamp(CLUSTER_TIME_KEY)) > 0 ? other : clusterTime; + } + }); } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/MongoCredentialWithCache.java b/driver-core/src/main/com/mongodb/internal/connection/MongoCredentialWithCache.java index 4a330641ef1..a612a82c3dd 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/MongoCredentialWithCache.java +++ b/driver-core/src/main/com/mongodb/internal/connection/MongoCredentialWithCache.java @@ -19,6 +19,11 @@ import com.mongodb.AuthenticationMechanism; import com.mongodb.MongoCredential; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static com.mongodb.internal.Locks.withLock; + public class MongoCredentialWithCache { private final MongoCredential credential; private final Cache cache; @@ -53,21 +58,29 @@ public void putInCache(final Object key, final Object value) { cache.set(key, value); } + public Lock getLock() { + return cache.lock; + } static class Cache { + private final ReentrantLock lock = new ReentrantLock(); private Object cacheKey; private Object cacheValue; - synchronized Object get(final Object key) { - if (cacheKey != null && cacheKey.equals(key)) { - return cacheValue; - } - return null; + Object get(final Object key) { + return withLock(lock, () -> { + if (cacheKey != null && cacheKey.equals(key)) { + return cacheValue; + } + return null; + }); } - synchronized void set(final Object key, final Object value) { - cacheKey = key; - cacheValue = value; + void set(final Object key, final Object value) { + withLock(lock, () -> { + cacheKey = key; + cacheValue = value; + }); } } } diff --git a/driver-core/src/main/com/mongodb/internal/connection/SaslAuthenticator.java b/driver-core/src/main/com/mongodb/internal/connection/SaslAuthenticator.java index f2aad3470b0..d7c52e49ce9 100644 --- a/driver-core/src/main/com/mongodb/internal/connection/SaslAuthenticator.java +++ b/driver-core/src/main/com/mongodb/internal/connection/SaslAuthenticator.java @@ -42,6 +42,7 @@ import static com.mongodb.MongoCredential.JAVA_SUBJECT_KEY; import static com.mongodb.MongoCredential.JAVA_SUBJECT_PROVIDER_KEY; +import static com.mongodb.internal.Locks.withLock; import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback; import static com.mongodb.internal.connection.CommandHelper.executeCommand; import static com.mongodb.internal.connection.CommandHelper.executeCommandAsync; @@ -49,7 +50,6 @@ abstract class SaslAuthenticator extends Authenticator implements SpeculativeAuthenticator { public static final Logger LOGGER = Loggers.getLogger("authenticator"); private static final String SUBJECT_PROVIDER_CACHE_KEY = "SUBJECT_PROVIDER"; - SaslAuthenticator(final MongoCredentialWithCache credential, final ClusterConnectionMode clusterConnectionMode, final @Nullable ServerApi serverApi) { super(credential, clusterConnectionMode, serverApi); @@ -205,7 +205,7 @@ protected Subject getSubject() { @NonNull private SubjectProvider getSubjectProvider() { - synchronized (getMongoCredentialWithCache()) { + return withLock(getMongoCredentialWithCache().getLock(), () -> { SubjectProvider subjectProvider = getMongoCredentialWithCache().getFromCache(SUBJECT_PROVIDER_CACHE_KEY, SubjectProvider.class); if (subjectProvider == null) { @@ -216,7 +216,7 @@ private SubjectProvider getSubjectProvider() { getMongoCredentialWithCache().putInCache(SUBJECT_PROVIDER_CACHE_KEY, subjectProvider); } return subjectProvider; - } + }); } @NonNull diff --git a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSDownloadStreamImpl.java b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSDownloadStreamImpl.java index 79b9e18a632..b157cc3c4c3 100644 --- a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSDownloadStreamImpl.java +++ b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSDownloadStreamImpl.java @@ -17,18 +17,21 @@ package com.mongodb.client.gridfs; import com.mongodb.MongoGridFSException; +import com.mongodb.client.ClientSession; import com.mongodb.client.FindIterable; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoCursor; import com.mongodb.client.gridfs.model.GridFSFile; import com.mongodb.lang.Nullable; -import com.mongodb.client.ClientSession; import org.bson.BsonValue; import org.bson.Document; import org.bson.types.Binary; +import java.util.concurrent.locks.ReentrantLock; + import static com.mongodb.assertions.Assertions.isTrueArgument; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.Locks.withLock; import static java.lang.String.format; class GridFSDownloadStreamImpl extends GridFSDownloadStream { @@ -47,8 +50,8 @@ class GridFSDownloadStreamImpl extends GridFSDownloadStream { private byte[] buffer = null; private long markPosition; - private final Object closeLock = new Object(); - private final Object cursorLock = new Object(); + private final ReentrantLock closeLock = new ReentrantLock(); + private final ReentrantLock cursorLock = new ReentrantLock(); private boolean closed = false; GridFSDownloadStreamImpl(@Nullable final ClientSession clientSession, final GridFSFile fileInfo, @@ -156,12 +159,12 @@ public void mark() { } @Override - public synchronized void mark(final int readlimit) { + public void mark(final int readlimit) { markPosition = currentPosition; } @Override - public synchronized void reset() { + public void reset() { checkClosed(); if (currentPosition == markPosition) { return; @@ -184,29 +187,29 @@ public boolean markSupported() { @Override public void close() { - synchronized (closeLock) { + withLock(closeLock, () -> { if (!closed) { closed = true; } discardCursor(); - } + }); } private void checkClosed() { - synchronized (closeLock) { + withLock(closeLock, () -> { if (closed) { throw new MongoGridFSException("The InputStream has been closed"); } - } + }); } private void discardCursor() { - synchronized (cursorLock) { + withLock(cursorLock, () -> { if (cursor != null) { cursor.close(); cursor = null; } - } + }); } @Nullable diff --git a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSUploadStreamImpl.java b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSUploadStreamImpl.java index 376e582c417..80fb6db7fb2 100644 --- a/driver-sync/src/main/com/mongodb/client/gridfs/GridFSUploadStreamImpl.java +++ b/driver-sync/src/main/com/mongodb/client/gridfs/GridFSUploadStreamImpl.java @@ -27,8 +27,10 @@ import org.bson.types.ObjectId; import java.util.Date; +import java.util.concurrent.locks.ReentrantLock; import static com.mongodb.assertions.Assertions.notNull; +import static com.mongodb.internal.Locks.withLock; final class GridFSUploadStreamImpl extends GridFSUploadStream { private final ClientSession clientSession; @@ -43,7 +45,7 @@ final class GridFSUploadStreamImpl extends GridFSUploadStream { private int bufferOffset; private int chunkIndex; - private final Object closeLock = new Object(); + private final ReentrantLock closeLock = new ReentrantLock(); private boolean closed = false; GridFSUploadStreamImpl(@Nullable final ClientSession clientSession, final MongoCollection filesCollection, @@ -76,10 +78,11 @@ public BsonValue getId() { @Override public void abort() { - synchronized (closeLock) { + withLock(closeLock, () -> { checkClosed(); closed = true; - } + }); + if (clientSession != null) { chunksCollection.deleteMany(clientSession, new Document("files_id", fileId)); } else { @@ -135,12 +138,12 @@ public void write(final byte[] b, final int off, final int len) { @Override public void close() { - synchronized (closeLock) { + withLock(closeLock, () -> { if (closed) { return; } closed = true; - } + }); writeChunk(); GridFSFile gridFSFile = new GridFSFile(fileId, filename, lengthInBytes, chunkSizeBytes, new Date(), metadata); @@ -175,10 +178,10 @@ private Binary getData() { } private void checkClosed() { - synchronized (closeLock) { + withLock(closeLock, () -> { if (closed) { throw new MongoGridFSException("The OutputStream has been closed"); } - } + }); } }