Skip to content

Commit 1d77078

Browse files
committed
Fix missing finish bug, refactor
1 parent 4814e43 commit 1d77078

File tree

9 files changed

+42
-42
lines changed

9 files changed

+42
-42
lines changed

driver-core/src/main/com/mongodb/connection/AsyncCompletionHandler.java

+3
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,9 @@ public interface AsyncCompletionHandler<T> {
4040
*/
4141
void failed(Throwable t);
4242

43+
/**
44+
* @return this handler as a callback
45+
*/
4346
default SingleResultCallback<T> asCallback() {
4447
return (r, t) -> {
4548
if (t != null) {

driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -203,7 +203,7 @@ public boolean supportsAdditionalTimeout() {
203203
@SuppressWarnings("deprecation")
204204
@Override
205205
public void openAsync(final SingleResultCallback<Void> callback) {
206-
AsyncCompletionHandler<Void> handler = callback.toHandler();
206+
AsyncCompletionHandler<Void> handler = callback.asHandler();
207207
isTrue("unopened", getChannel() == null);
208208
try {
209209
SocketChannel socketChannel = SocketChannel.open();

driver-core/src/main/com/mongodb/connection/netty/NettyStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ public void open() throws IOException {
166166
@SuppressWarnings("deprecation")
167167
@Override
168168
public void openAsync(final SingleResultCallback<Void> callback) {
169-
AsyncCompletionHandler<Void> handler = callback.toHandler();
169+
AsyncCompletionHandler<Void> handler = callback.asHandler();
170170
Queue<SocketAddress> socketAddressQueue;
171171

172172
try {

driver-core/src/main/com/mongodb/internal/async/SingleResultCallback.java

+4-1
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,10 @@ public interface SingleResultCallback<T> {
3737
*/
3838
void onResult(@Nullable T result, @Nullable Throwable t);
3939

40-
default AsyncCompletionHandler<T> toHandler() {
40+
/**
41+
* @return this callback as a handler
42+
*/
43+
default AsyncCompletionHandler<T> asHandler() {
4144
return new AsyncCompletionHandler<T>() {
4245
@Override
4346
public void completed(@Nullable final T result) {

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final
5858
@SuppressWarnings("deprecation")
5959
@Override
6060
public void openAsync(final SingleResultCallback<Void> callback) {
61-
AsyncCompletionHandler<Void> handler = callback.toHandler();
61+
AsyncCompletionHandler<Void> handler = callback.asHandler();
6262

6363
isTrue("unopened", getChannel() == null);
6464
Queue<SocketAddress> socketAddressQueue;

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,10 @@ public void startHandshakeAsync(final InternalConnection internalConnection,
109109
executeCommandAsync("admin", helloCommandDocument, clusterConnectionMode, serverApi, internalConnection, c2);
110110
}).onErrorIf(e -> e instanceof MongoException, (t, c2) -> {
111111
throw mapHelloException((MongoException) t);
112-
}).thenApply((helloResult, c2) -> {
112+
}).<InternalConnectionInitializationDescription>thenApply((helloResult, c2) -> {
113113
setSpeculativeAuthenticateResponse(helloResult);
114114
c2.complete(createInitializationDescription(helloResult, internalConnection, startTime));
115-
});
115+
}).finish(c);
116116
}).finish(callback);
117117
}
118118

driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ below, SHOULD be used (for consistency, and ease of comparison/review).
122122
1. Is everything inside the boilerplate?
123123
2. Is "callback" supplied to "finish"?
124124
3. In each block and nested block, is that same block's "c" always passed/completed at the end of execution?
125-
4. Is any c.complete followed by a return, to end execution?
126-
5. Do any sync methods still need to be converted to async?
125+
4. Is every c.complete followed by a return, to end execution?
126+
5. Have all sync method calls been converted to async, where needed?
127127
*/
128128
}
129129

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionBinding.java

+15-18
Original file line numberDiff line numberDiff line change
@@ -104,27 +104,24 @@ public void getWriteConnectionSource(final SingleResultCallback<AsyncConnectionS
104104

105105
private void getConnectionSource(final AsyncSupplier<AsyncConnectionSource> connectionSourceSupplier,
106106
final SingleResultCallback<AsyncConnectionSource> callback) {
107-
// wrapper applied at end
108107
beginAsync().<AsyncConnectionSource>thenSupply(c -> {
109108
if (!session.hasActiveTransaction()) {
110109
connectionSourceSupplier.getAsync(c);
111-
return;
112-
}
113-
if (TransactionContext.get(session) != null) {
110+
} else if (TransactionContext.get(session) != null) {
114111
wrapped.getConnectionSource(assertNotNull(session.getPinnedServerAddress()), c);
115-
return;
112+
} else {
113+
beginAsync().<AsyncConnectionSource>thenSupply(c2 -> {
114+
connectionSourceSupplier.getAsync(c2);
115+
}).<AsyncConnectionSource>thenApply((source, c2) -> {
116+
ClusterType clusterType = assertNotNull(source).getServerDescription().getClusterType();
117+
if (clusterType == SHARDED || clusterType == LOAD_BALANCED) {
118+
TransactionContext<AsyncConnection> transactionContext = new TransactionContext<>(clusterType);
119+
session.setTransactionContext(source.getServerDescription().getAddress(), transactionContext);
120+
transactionContext.release(); // The session is responsible for retaining a reference to the context
121+
}
122+
c2.complete(source);
123+
}).finish(c);
116124
}
117-
beginAsync().<AsyncConnectionSource>thenSupply(c2 -> {
118-
connectionSourceSupplier.getAsync(c2);
119-
}).<AsyncConnectionSource>thenApply((source, c2) -> {
120-
ClusterType clusterType = assertNotNull(source).getServerDescription().getClusterType();
121-
if (clusterType == SHARDED || clusterType == LOAD_BALANCED) {
122-
TransactionContext<AsyncConnection> transactionContext = new TransactionContext<>(clusterType);
123-
session.setTransactionContext(source.getServerDescription().getAddress(), transactionContext);
124-
transactionContext.release(); // The session is responsible for retaining a reference to the context
125-
} //
126-
c2.complete(source);
127-
}).finish(c);
128125
}).<AsyncConnectionSource>thenApply((source, c) -> {
129126
c.complete(new SessionBindingAsyncConnectionSource(source));
130127
}).finish(callback);
@@ -194,12 +191,12 @@ public void getConnection(final SingleResultCallback<AsyncConnection> callback)
194191
if (transactionContext == null || !transactionContext.isConnectionPinningRequired()) {
195192
wrapped.getConnection(c);
196193
return;
197-
} //
194+
}
198195
AsyncConnection pinnedAsyncConnection = transactionContext.getPinnedConnection();
199196
if (pinnedAsyncConnection != null) {
200197
c.complete(pinnedAsyncConnection.retain());
201198
return;
202-
} //
199+
}
203200
beginAsync().<AsyncConnection>thenSupply(c2 -> {
204201
wrapped.getConnection(c2);
205202
}).<AsyncConnection>thenApply((connection, c2) -> {

driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java

+13-16
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,6 @@
3434
import com.mongodb.internal.session.SessionContext;
3535
import com.mongodb.lang.Nullable;
3636

37-
import java.util.function.Function;
3837
import java.util.function.Supplier;
3938

4039
import static com.mongodb.connection.ClusterType.LOAD_BALANCED;
@@ -126,23 +125,21 @@ public OperationContext getOperationContext() {
126125
}
127126

128127
private ConnectionSource getConnectionSource(final Supplier<ConnectionSource> connectionSourceSupplier) {
129-
Function<ConnectionSource, ConnectionSource> wrapper = c -> new SessionBindingConnectionSource(c);
130-
128+
ConnectionSource source;
131129
if (!session.hasActiveTransaction()) {
132-
return wrapper.apply(connectionSourceSupplier.get());
133-
}
134-
if (TransactionContext.get(session) != null) {
135-
return wrapper.apply(
136-
wrapped.getConnectionSource(assertNotNull(session.getPinnedServerAddress())));
137-
}
138-
ConnectionSource source = connectionSourceSupplier.get();
139-
ClusterType clusterType = source.getServerDescription().getClusterType();
140-
if (clusterType == SHARDED || clusterType == LOAD_BALANCED) {
141-
TransactionContext<Connection> transactionContext = new TransactionContext<>(clusterType);
142-
session.setTransactionContext(source.getServerDescription().getAddress(), transactionContext);
143-
transactionContext.release(); // The session is responsible for retaining a reference to the context
130+
source = connectionSourceSupplier.get();
131+
} else if (TransactionContext.get(session) != null) {
132+
source = wrapped.getConnectionSource(assertNotNull(session.getPinnedServerAddress()));
133+
} else {
134+
source = connectionSourceSupplier.get();
135+
ClusterType clusterType = source.getServerDescription().getClusterType();
136+
if (clusterType == SHARDED || clusterType == LOAD_BALANCED) {
137+
TransactionContext<Connection> transactionContext = new TransactionContext<>(clusterType);
138+
session.setTransactionContext(source.getServerDescription().getAddress(), transactionContext);
139+
transactionContext.release(); // The session is responsible for retaining a reference to the context
140+
}
144141
}
145-
return wrapper.apply(source);
142+
return new SessionBindingConnectionSource(source);
146143
}
147144

148145
private class SessionBindingConnectionSource implements ConnectionSource {

0 commit comments

Comments
 (0)