Skip to content

Commit fd74c16

Browse files
committed
Add missing finish, refactor
1 parent 4814e43 commit fd74c16

File tree

4 files changed

+32
-37
lines changed

4 files changed

+32
-37
lines changed

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -109,10 +109,10 @@ public void startHandshakeAsync(final InternalConnection internalConnection,
109109
executeCommandAsync("admin", helloCommandDocument, clusterConnectionMode, serverApi, internalConnection, c2);
110110
}).onErrorIf(e -> e instanceof MongoException, (t, c2) -> {
111111
throw mapHelloException((MongoException) t);
112-
}).thenApply((helloResult, c2) -> {
112+
}).<InternalConnectionInitializationDescription>thenApply((helloResult, c2) -> {
113113
setSpeculativeAuthenticateResponse(helloResult);
114114
c2.complete(createInitializationDescription(helloResult, internalConnection, startTime));
115-
});
115+
}).finish(c);
116116
}).finish(callback);
117117
}
118118

driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -122,8 +122,8 @@ below, SHOULD be used (for consistency, and ease of comparison/review).
122122
1. Is everything inside the boilerplate?
123123
2. Is "callback" supplied to "finish"?
124124
3. In each block and nested block, is that same block's "c" always passed/completed at the end of execution?
125-
4. Is any c.complete followed by a return, to end execution?
126-
5. Do any sync methods still need to be converted to async?
125+
4. Is every c.complete followed by a return, to end execution?
126+
5. Have all sync method calls been converted to async, where needed?
127127
*/
128128
}
129129

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/ClientSessionBinding.java

+15-18
Original file line numberDiff line numberDiff line change
@@ -104,27 +104,24 @@ public void getWriteConnectionSource(final SingleResultCallback<AsyncConnectionS
104104

105105
private void getConnectionSource(final AsyncSupplier<AsyncConnectionSource> connectionSourceSupplier,
106106
final SingleResultCallback<AsyncConnectionSource> callback) {
107-
// wrapper applied at end
108107
beginAsync().<AsyncConnectionSource>thenSupply(c -> {
109108
if (!session.hasActiveTransaction()) {
110109
connectionSourceSupplier.getAsync(c);
111-
return;
112-
}
113-
if (TransactionContext.get(session) != null) {
110+
} else if (TransactionContext.get(session) != null) {
114111
wrapped.getConnectionSource(assertNotNull(session.getPinnedServerAddress()), c);
115-
return;
112+
} else {
113+
beginAsync().<AsyncConnectionSource>thenSupply(c2 -> {
114+
connectionSourceSupplier.getAsync(c2);
115+
}).<AsyncConnectionSource>thenApply((source, c2) -> {
116+
ClusterType clusterType = assertNotNull(source).getServerDescription().getClusterType();
117+
if (clusterType == SHARDED || clusterType == LOAD_BALANCED) {
118+
TransactionContext<AsyncConnection> transactionContext = new TransactionContext<>(clusterType);
119+
session.setTransactionContext(source.getServerDescription().getAddress(), transactionContext);
120+
transactionContext.release(); // The session is responsible for retaining a reference to the context
121+
}
122+
c2.complete(source);
123+
}).finish(c);
116124
}
117-
beginAsync().<AsyncConnectionSource>thenSupply(c2 -> {
118-
connectionSourceSupplier.getAsync(c2);
119-
}).<AsyncConnectionSource>thenApply((source, c2) -> {
120-
ClusterType clusterType = assertNotNull(source).getServerDescription().getClusterType();
121-
if (clusterType == SHARDED || clusterType == LOAD_BALANCED) {
122-
TransactionContext<AsyncConnection> transactionContext = new TransactionContext<>(clusterType);
123-
session.setTransactionContext(source.getServerDescription().getAddress(), transactionContext);
124-
transactionContext.release(); // The session is responsible for retaining a reference to the context
125-
} //
126-
c2.complete(source);
127-
}).finish(c);
128125
}).<AsyncConnectionSource>thenApply((source, c) -> {
129126
c.complete(new SessionBindingAsyncConnectionSource(source));
130127
}).finish(callback);
@@ -194,12 +191,12 @@ public void getConnection(final SingleResultCallback<AsyncConnection> callback)
194191
if (transactionContext == null || !transactionContext.isConnectionPinningRequired()) {
195192
wrapped.getConnection(c);
196193
return;
197-
} //
194+
}
198195
AsyncConnection pinnedAsyncConnection = transactionContext.getPinnedConnection();
199196
if (pinnedAsyncConnection != null) {
200197
c.complete(pinnedAsyncConnection.retain());
201198
return;
202-
} //
199+
}
203200
beginAsync().<AsyncConnection>thenSupply(c2 -> {
204201
wrapped.getConnection(c2);
205202
}).<AsyncConnection>thenApply((connection, c2) -> {

driver-sync/src/main/com/mongodb/client/internal/ClientSessionBinding.java

+13-15
Original file line numberDiff line numberDiff line change
@@ -126,23 +126,21 @@ public OperationContext getOperationContext() {
126126
}
127127

128128
private ConnectionSource getConnectionSource(final Supplier<ConnectionSource> connectionSourceSupplier) {
129-
Function<ConnectionSource, ConnectionSource> wrapper = c -> new SessionBindingConnectionSource(c);
130-
129+
ConnectionSource source;
131130
if (!session.hasActiveTransaction()) {
132-
return wrapper.apply(connectionSourceSupplier.get());
133-
}
134-
if (TransactionContext.get(session) != null) {
135-
return wrapper.apply(
136-
wrapped.getConnectionSource(assertNotNull(session.getPinnedServerAddress())));
137-
}
138-
ConnectionSource source = connectionSourceSupplier.get();
139-
ClusterType clusterType = source.getServerDescription().getClusterType();
140-
if (clusterType == SHARDED || clusterType == LOAD_BALANCED) {
141-
TransactionContext<Connection> transactionContext = new TransactionContext<>(clusterType);
142-
session.setTransactionContext(source.getServerDescription().getAddress(), transactionContext);
143-
transactionContext.release(); // The session is responsible for retaining a reference to the context
131+
source = connectionSourceSupplier.get();
132+
} else if (TransactionContext.get(session) != null) {
133+
source = wrapped.getConnectionSource(assertNotNull(session.getPinnedServerAddress()));
134+
} else {
135+
source = connectionSourceSupplier.get();
136+
ClusterType clusterType = source.getServerDescription().getClusterType();
137+
if (clusterType == SHARDED || clusterType == LOAD_BALANCED) {
138+
TransactionContext<Connection> transactionContext = new TransactionContext<>(clusterType);
139+
session.setTransactionContext(source.getServerDescription().getAddress(), transactionContext);
140+
transactionContext.release(); // The session is responsible for retaining a reference to the context
141+
}
144142
}
145-
return wrapper.apply(source);
143+
return new SessionBindingConnectionSource(source);
146144
}
147145

148146
private class SessionBindingConnectionSource implements ConnectionSource {

0 commit comments

Comments
 (0)