Skip to content

Commit b9f70ad

Browse files
authored
Replace uninterruptible Lock.lock with Lock.lockInterruptibly (#1206)
JAVA-5140
1 parent ea4647b commit b9f70ad

File tree

10 files changed

+139
-224
lines changed

10 files changed

+139
-224
lines changed

driver-core/src/main/com/mongodb/connection/netty/NettyStream.java

Lines changed: 10 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,8 @@
6868

6969
import static com.mongodb.assertions.Assertions.assertNotNull;
7070
import static com.mongodb.assertions.Assertions.isTrueArgument;
71+
import static com.mongodb.internal.Locks.lockInterruptibly;
72+
import static com.mongodb.internal.Locks.withLock;
7173
import static com.mongodb.internal.connection.SslHelper.enableHostNameVerification;
7274
import static com.mongodb.internal.connection.SslHelper.enableSni;
7375
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
@@ -286,7 +288,7 @@ public void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf>
286288
private void readAsync(final int numBytes, final AsyncCompletionHandler<ByteBuf> handler, final long readTimeoutMillis) {
287289
ByteBuf buffer = null;
288290
Throwable exceptionResult = null;
289-
lock.lock();
291+
lockInterruptibly(lock);
290292
try {
291293
exceptionResult = pendingException;
292294
if (exceptionResult == null) {
@@ -344,18 +346,14 @@ private boolean hasBytesAvailable(final int numBytes) {
344346
}
345347

346348
private void handleReadResponse(@Nullable final io.netty.buffer.ByteBuf buffer, @Nullable final Throwable t) {
347-
PendingReader localPendingReader = null;
348-
lock.lock();
349-
try {
349+
PendingReader localPendingReader = withLock(lock, () -> {
350350
if (buffer != null) {
351351
pendingInboundBuffers.add(buffer.retain());
352352
} else {
353353
pendingException = t;
354354
}
355-
localPendingReader = pendingReader;
356-
} finally {
357-
lock.unlock();
358-
}
355+
return pendingReader;
356+
});
359357

360358
if (localPendingReader != null) {
361359
//timeouts may be scheduled only by the public read methods
@@ -370,8 +368,7 @@ public ServerAddress getAddress() {
370368

371369
@Override
372370
public void close() {
373-
lock.lock();
374-
try {
371+
withLock(lock, () -> {
375372
isClosed = true;
376373
if (channel != null) {
377374
channel.close();
@@ -382,9 +379,7 @@ public void close() {
382379
iterator.remove();
383380
nextByteBuf.release();
384381
}
385-
} finally {
386-
lock.unlock();
387-
}
382+
});
388383
}
389384

390385
@Override
@@ -519,8 +514,7 @@ private class OpenChannelFutureListener implements ChannelFutureListener {
519514

520515
@Override
521516
public void operationComplete(final ChannelFuture future) {
522-
lock.lock();
523-
try {
517+
withLock(lock, () -> {
524518
if (future.isSuccess()) {
525519
if (isClosed) {
526520
channelFuture.channel().close();
@@ -538,9 +532,7 @@ public void operationComplete(final ChannelFuture future) {
538532
initializeChannel(handler, socketAddressQueue);
539533
}
540534
}
541-
} finally {
542-
lock.unlock();
543-
}
535+
});
544536
}
545537
}
546538

driver-core/src/main/com/mongodb/internal/Locks.java

Lines changed: 29 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,10 @@
1616

1717
package com.mongodb.internal;
1818

19+
import com.mongodb.MongoInterruptedException;
20+
1921
import java.util.concurrent.locks.Lock;
22+
import java.util.concurrent.locks.ReentrantLock;
2023
import java.util.function.Supplier;
2124

2225
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
@@ -37,18 +40,39 @@ public static <V> V withLock(final Lock lock, final Supplier<V> supplier) {
3740
}
3841

3942
public static <V, E extends Exception> V checkedWithLock(final Lock lock, final CheckedSupplier<V, E> supplier) throws E {
43+
lockInterruptibly(lock);
44+
try {
45+
return supplier.get();
46+
} finally {
47+
lock.unlock();
48+
}
49+
}
50+
51+
public static void lockInterruptibly(final Lock lock) throws MongoInterruptedException {
4052
try {
4153
lock.lockInterruptibly();
42-
try {
43-
return supplier.get();
44-
} finally {
45-
lock.unlock();
46-
}
4754
} catch (InterruptedException e) {
4855
throw interruptAndCreateMongoInterruptedException("Interrupted waiting for lock", e);
4956
}
5057
}
5158

59+
public static void lockInterruptiblyUnfair(
60+
// The type must be `ReentrantLock`, not `Lock`,
61+
// because only `ReentrantLock.tryLock` is documented to have the barging behavior.
62+
final ReentrantLock lock) throws MongoInterruptedException {
63+
if (Thread.currentThread().isInterrupted()) {
64+
throw interruptAndCreateMongoInterruptedException(null, null);
65+
}
66+
// `ReentrantLock.tryLock` is unfair
67+
if (!lock.tryLock()) {
68+
try {
69+
lock.lockInterruptibly();
70+
} catch (InterruptedException e) {
71+
throw interruptAndCreateMongoInterruptedException(null, e);
72+
}
73+
}
74+
}
75+
5276
private Locks() {
5377
}
5478
}

driver-core/src/main/com/mongodb/internal/authentication/AzureCredentialHelper.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import java.util.concurrent.locks.Lock;
3030
import java.util.concurrent.locks.ReentrantLock;
3131

32+
import static com.mongodb.internal.Locks.lockInterruptibly;
3233
import static com.mongodb.internal.authentication.HttpHelper.getHttpContents;
3334

3435
/**
@@ -48,7 +49,7 @@ public static BsonDocument obtainFromEnvironment() {
4849
if (cachedValue.isPresent()) {
4950
accessToken = cachedValue.get();
5051
} else {
51-
CACHED_ACCESS_TOKEN_LOCK.lock();
52+
lockInterruptibly(CACHED_ACCESS_TOKEN_LOCK);
5253
try {
5354
cachedValue = cachedAccessToken.getValue();
5455
if (cachedValue.isPresent()) {

driver-core/src/main/com/mongodb/internal/connection/ConcurrentPool.java

Lines changed: 7 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@
3333
import java.util.concurrent.TimeUnit;
3434
import java.util.concurrent.atomic.AtomicInteger;
3535
import java.util.concurrent.locks.Condition;
36-
import java.util.concurrent.locks.Lock;
3736
import java.util.concurrent.locks.ReentrantLock;
3837
import java.util.concurrent.locks.ReentrantReadWriteLock;
3938
import java.util.function.Consumer;
@@ -42,6 +41,8 @@
4241
import static com.mongodb.assertions.Assertions.assertNotNull;
4342
import static com.mongodb.assertions.Assertions.assertTrue;
4443
import static com.mongodb.assertions.Assertions.notNull;
44+
import static com.mongodb.internal.Locks.lockInterruptibly;
45+
import static com.mongodb.internal.Locks.lockInterruptiblyUnfair;
4546
import static com.mongodb.internal.VisibleForTesting.AccessModifier.PRIVATE;
4647
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
4748

@@ -370,7 +371,7 @@ int permits() {
370371
}
371372

372373
boolean acquirePermitImmediateUnfair() {
373-
lockUnfair(lock);
374+
lockInterruptiblyUnfair(lock);
374375
try {
375376
throwIfClosedOrPaused();
376377
if (permits > 0) {
@@ -427,7 +428,7 @@ boolean acquirePermit(final long timeout, final TimeUnit unit) throws MongoInter
427428
}
428429

429430
void releasePermit() {
430-
lockUnfair(lock);
431+
lockInterruptiblyUnfair(lock);
431432
try {
432433
assertTrue(permits < maxPermits);
433434
//noinspection NonAtomicOperationOnVolatileField
@@ -439,7 +440,7 @@ void releasePermit() {
439440
}
440441

441442
void pause(final Supplier<MongoException> causeSupplier) {
442-
lockUnfair(lock);
443+
lockInterruptiblyUnfair(lock);
443444
try {
444445
if (!paused) {
445446
this.paused = true;
@@ -453,7 +454,7 @@ void pause(final Supplier<MongoException> causeSupplier) {
453454

454455
void ready() {
455456
if (paused) {
456-
lockUnfair(lock);
457+
lockInterruptiblyUnfair(lock);
457458
try {
458459
this.paused = false;
459460
this.causeSupplier = null;
@@ -468,7 +469,7 @@ void ready() {
468469
*/
469470
boolean close() {
470471
if (!closed) {
471-
lockUnfair(lock);
472+
lockInterruptiblyUnfair(lock);
472473
try {
473474
if (!closed) {
474475
closed = true;
@@ -515,32 +516,4 @@ static String sizeToString(final int size) {
515516
return size == INFINITE_SIZE ? "infinite" : Integer.toString(size);
516517
}
517518

518-
static void lockInterruptibly(final Lock lock) throws MongoInterruptedException {
519-
try {
520-
lock.lockInterruptibly();
521-
} catch (InterruptedException e) {
522-
throw interruptAndCreateMongoInterruptedException(null, e);
523-
}
524-
}
525-
526-
private static void lockInterruptiblyUnfair(final ReentrantLock lock) throws MongoInterruptedException {
527-
if (Thread.currentThread().isInterrupted()) {
528-
throw interruptAndCreateMongoInterruptedException(null, null);
529-
}
530-
// `ReentrantLock.tryLock` is unfair
531-
if (!lock.tryLock()) {
532-
try {
533-
lock.lockInterruptibly();
534-
} catch (InterruptedException e) {
535-
throw interruptAndCreateMongoInterruptedException(null, e);
536-
}
537-
}
538-
}
539-
540-
static void lockUnfair(final ReentrantLock lock) {
541-
// `ReentrantLock.tryLock` is unfair
542-
if (!lock.tryLock()) {
543-
lock.lock();
544-
}
545-
}
546519
}

0 commit comments

Comments
 (0)