Skip to content

Commit 35061d0

Browse files
committed
Use async API in InternalStreamConnection
1 parent 29586ff commit 35061d0

File tree

11 files changed

+70
-65
lines changed

11 files changed

+70
-65
lines changed

driver-core/src/main/com/mongodb/connection/AsyncCompletionHandler.java

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.mongodb.connection;
1818

19+
import com.mongodb.internal.async.SingleResultCallback;
1920
import com.mongodb.lang.Nullable;
2021

2122
/**
@@ -38,4 +39,14 @@ public interface AsyncCompletionHandler<T> {
3839
* @param t the exception that describes the failure
3940
*/
4041
void failed(Throwable t);
42+
43+
default SingleResultCallback<T> asCallback() {
44+
return (r, t) -> {
45+
if (t != null) {
46+
failed(t);
47+
} else {
48+
completed(r);
49+
}
50+
};
51+
}
4152
}

driver-core/src/main/com/mongodb/connection/Stream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.mongodb.connection;
1818

1919
import com.mongodb.ServerAddress;
20+
import com.mongodb.internal.async.SingleResultCallback;
2021
import org.bson.ByteBuf;
2122

2223
import java.io.IOException;
@@ -43,7 +44,7 @@ public interface Stream extends BufferProvider{
4344
*
4445
* @param handler the completion handler for opening the stream
4546
*/
46-
void openAsync(AsyncCompletionHandler<Void> handler);
47+
void openAsync(SingleResultCallback<Void> handler);
4748

4849
/**
4950
* Write each buffer in the list to the stream in order, blocking until all are completely written.

driver-core/src/main/com/mongodb/connection/TlsChannelStreamFactoryFactory.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import com.mongodb.MongoClientException;
2020
import com.mongodb.MongoSocketOpenException;
2121
import com.mongodb.ServerAddress;
22+
import com.mongodb.internal.async.SingleResultCallback;
2223
import com.mongodb.internal.connection.AsynchronousChannelStream;
2324
import com.mongodb.internal.connection.ExtendedAsynchronousByteChannel;
2425
import com.mongodb.internal.connection.PowerOfTwoBufferPool;
@@ -201,7 +202,8 @@ public boolean supportsAdditionalTimeout() {
201202

202203
@SuppressWarnings("deprecation")
203204
@Override
204-
public void openAsync(final AsyncCompletionHandler<Void> handler) {
205+
public void openAsync(final SingleResultCallback<Void> callback) {
206+
AsyncCompletionHandler<Void> handler = callback.toHandler();
205207
isTrue("unopened", getChannel() == null);
206208
try {
207209
SocketChannel socketChannel = SocketChannel.open();

driver-core/src/main/com/mongodb/connection/netty/NettyStream.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
import com.mongodb.connection.SocketSettings;
2929
import com.mongodb.connection.SslSettings;
3030
import com.mongodb.connection.Stream;
31+
import com.mongodb.internal.async.SingleResultCallback;
3132
import com.mongodb.internal.connection.netty.NettyByteBuf;
3233
import com.mongodb.lang.Nullable;
3334
import io.netty.bootstrap.Bootstrap;
@@ -158,13 +159,14 @@ public ByteBuf getBuffer(final int size) {
158159
@Override
159160
public void open() throws IOException {
160161
FutureAsyncCompletionHandler<Void> handler = new FutureAsyncCompletionHandler<>();
161-
openAsync(handler);
162+
openAsync(handler.asCallback());
162163
handler.get();
163164
}
164165

165166
@SuppressWarnings("deprecation")
166167
@Override
167-
public void openAsync(final AsyncCompletionHandler<Void> handler) {
168+
public void openAsync(final SingleResultCallback<Void> callback) {
169+
AsyncCompletionHandler<Void> handler = callback.toHandler();
168170
Queue<SocketAddress> socketAddressQueue;
169171

170172
try {

driver-core/src/main/com/mongodb/internal/connection/AsynchronousChannelStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ private void readAsync(final int numBytes, final int additionalTimeout, final As
127127
@Override
128128
public void open() throws IOException {
129129
FutureAsyncCompletionHandler<Void> handler = new FutureAsyncCompletionHandler<>();
130-
openAsync(handler);
130+
openAsync(handler.asCallback());
131131
handler.getOpen();
132132
}
133133

driver-core/src/main/com/mongodb/internal/connection/AsynchronousSocketChannelStream.java

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.mongodb.ServerAddress;
2222
import com.mongodb.connection.AsyncCompletionHandler;
2323
import com.mongodb.connection.SocketSettings;
24+
import com.mongodb.internal.async.SingleResultCallback;
2425
import com.mongodb.lang.Nullable;
2526

2627
import java.io.IOException;
@@ -56,7 +57,9 @@ public AsynchronousSocketChannelStream(final ServerAddress serverAddress, final
5657

5758
@SuppressWarnings("deprecation")
5859
@Override
59-
public void openAsync(final AsyncCompletionHandler<Void> handler) {
60+
public void openAsync(final SingleResultCallback<Void> callback) {
61+
AsyncCompletionHandler<Void> handler = callback.toHandler();
62+
6063
isTrue("unopened", getChannel() == null);
6164
Queue<SocketAddress> socketAddressQueue;
6265

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

Lines changed: 40 additions & 55 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,7 @@
7272
import static com.mongodb.assertions.Assertions.assertNotNull;
7373
import static com.mongodb.assertions.Assertions.isTrue;
7474
import static com.mongodb.assertions.Assertions.notNull;
75+
import static com.mongodb.internal.async.AsyncRunnable.beginAsync;
7576
import static com.mongodb.internal.async.ErrorHandlingResultCallback.errorHandlingCallback;
7677
import static com.mongodb.internal.connection.CommandHelper.HELLO;
7778
import static com.mongodb.internal.connection.CommandHelper.LEGACY_HELLO;
@@ -205,15 +206,16 @@ public int getGeneration() {
205206

206207
@Override
207208
public void open() {
208-
isTrue("Open already called", stream == null);
209-
stream = streamFactory.create(getServerAddressWithResolver());
210209
try {
210+
isTrue("Open already called", stream == null);
211+
stream = streamFactory.create(getServerAddressWithResolver());
211212
stream.open();
212213

213214
InternalConnectionInitializationDescription initializationDescription = connectionInitializer.startHandshake(this);
214-
initAfterHandshakeStart(initializationDescription);
215215

216+
initAfterHandshakeStart(initializationDescription);
216217
initializationDescription = connectionInitializer.finishHandshake(this, initializationDescription);
218+
217219
initAfterHandshakeFinish(initializationDescription);
218220
} catch (Throwable t) {
219221
close();
@@ -227,45 +229,27 @@ public void open() {
227229

228230
@Override
229231
public void openAsync(final SingleResultCallback<Void> callback) {
230-
isTrue("Open already called", stream == null, callback);
231-
try {
232-
stream = streamFactory.create(getServerAddressWithResolver());
233-
stream.openAsync(new AsyncCompletionHandler<Void>() {
234-
@Override
235-
public void completed(@Nullable final Void aVoid) {
236-
connectionInitializer.startHandshakeAsync(InternalStreamConnection.this,
237-
(initialResult, initialException) -> {
238-
if (initialException != null) {
239-
close();
240-
callback.onResult(null, initialException);
241-
} else {
242-
assertNotNull(initialResult);
243-
initAfterHandshakeStart(initialResult);
244-
connectionInitializer.finishHandshakeAsync(InternalStreamConnection.this,
245-
initialResult, (completedResult, completedException) -> {
246-
if (completedException != null) {
247-
close();
248-
callback.onResult(null, completedException);
249-
} else {
250-
assertNotNull(completedResult);
251-
initAfterHandshakeFinish(completedResult);
252-
callback.onResult(null, null);
253-
}
254-
});
255-
}
256-
});
257-
}
258-
259-
@Override
260-
public void failed(final Throwable t) {
261-
close();
262-
callback.onResult(null, t);
263-
}
264-
});
265-
} catch (Throwable t) {
232+
beginAsync().thenRun(
233+
beginAsync().thenRun(c -> {
234+
isTrue("Open already called", stream == null, callback);
235+
stream = streamFactory.create(getServerAddressWithResolver());
236+
stream.openAsync(c);
237+
}).<InternalConnectionInitializationDescription>thenSupply(c -> {
238+
connectionInitializer.startHandshakeAsync(this, c);
239+
}).<InternalConnectionInitializationDescription>thenApply((initializationDescription, c) -> {
240+
initAfterHandshakeStart(assertNotNull(initializationDescription));
241+
connectionInitializer.finishHandshakeAsync(this, initializationDescription, c);
242+
}).thenConsume((initializationDescription, c) -> {
243+
initAfterHandshakeFinish(assertNotNull(initializationDescription));
244+
})
245+
).onErrorIf(t -> true, (t, c) -> {
266246
close();
267-
callback.onResult(null, t);
268-
}
247+
if (t instanceof MongoException) {
248+
throw (MongoException) t;
249+
} else {
250+
throw new MongoException(assertNotNull(t).toString(), t);
251+
}
252+
}).finish(callback);
269253
}
270254

271255
private ServerAddress getServerAddressWithResolver() {
@@ -336,7 +320,7 @@ private Compressor createCompressor(final MongoCompressor mongoCompressor) {
336320
public void close() {
337321
// All but the first call is a no-op
338322
if (!isClosed.getAndSet(true) && (stream != null)) {
339-
stream.close();
323+
stream.close();
340324
}
341325
}
342326

@@ -352,8 +336,9 @@ public boolean isClosed() {
352336

353337
@Nullable
354338
@Override
355-
public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decoder, final SessionContext sessionContext,
356-
final RequestContext requestContext, final OperationContext operationContext) {
339+
public <T> T sendAndReceive(final CommandMessage message,
340+
final Decoder<T> decoder, final SessionContext sessionContext,
341+
final RequestContext requestContext, final OperationContext operationContext) {
357342
CommandEventSender commandEventSender;
358343

359344
try (ByteBufferBsonOutput bsonOutput = new ByteBufferBsonOutput(this)) {
@@ -476,8 +461,10 @@ private <T> T receiveCommandMessageResponse(final Decoder<T> decoder,
476461
}
477462

478463
@Override
479-
public <T> void sendAndReceiveAsync(final CommandMessage message, final Decoder<T> decoder, final SessionContext sessionContext,
480-
final RequestContext requestContext, final OperationContext operationContext, final SingleResultCallback<T> callback) {
464+
public <T> void sendAndReceiveAsync(final CommandMessage message,
465+
final Decoder<T> decoder, final SessionContext sessionContext,
466+
final RequestContext requestContext, final OperationContext operationContext,
467+
final SingleResultCallback<T> callback) {
481468
notNull("stream is open", stream, callback);
482469

483470
if (isClosed()) {
@@ -575,11 +562,9 @@ private <T> T getCommandResult(final Decoder<T> decoder, final ResponseBuffers r
575562
@Override
576563
public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId) {
577564
notNull("stream is open", stream);
578-
579565
if (isClosed()) {
580566
throw new MongoSocketClosedException("Cannot write to a closed stream", getServerAddress());
581567
}
582-
583568
try {
584569
stream.write(byteBuffers);
585570
} catch (Exception e) {
@@ -609,14 +594,14 @@ private ResponseBuffers receiveMessageWithAdditionalTimeout(final int additional
609594

610595
@Override
611596
public void sendMessageAsync(final List<ByteBuf> byteBuffers, final int lastRequestId, final SingleResultCallback<Void> callback) {
612-
notNull("stream is open", stream, callback);
613-
614-
if (isClosed()) {
615-
callback.onResult(null, new MongoSocketClosedException("Can not read from a closed socket", getServerAddress()));
616-
return;
617-
}
597+
beginAsync().thenRun(c -> {
598+
notNull("stream is open", stream, callback);
599+
if (isClosed()) {
600+
throw new MongoSocketClosedException("Can not read from a closed socket", getServerAddress());
601+
}
618602

619-
writeAsync(byteBuffers, errorHandlingCallback(callback, LOGGER));
603+
writeAsync(byteBuffers, errorHandlingCallback(callback, LOGGER));
604+
}).finish(callback);
620605
}
621606

622607
private void writeAsync(final List<ByteBuf> byteBuffers, final SingleResultCallback<Void> callback) {

driver-core/src/main/com/mongodb/internal/connection/SocketStream.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.mongodb.connection.SocketSettings;
2727
import com.mongodb.connection.SslSettings;
2828
import com.mongodb.connection.Stream;
29+
import com.mongodb.internal.async.SingleResultCallback;
2930
import org.bson.ByteBuf;
3031

3132
import javax.net.SocketFactory;
@@ -207,7 +208,7 @@ public ByteBuf read(final int numBytes, final int additionalTimeout) throws IOEx
207208
}
208209

209210
@Override
210-
public void openAsync(final AsyncCompletionHandler<Void> handler) {
211+
public void openAsync(final SingleResultCallback<Void> callback) {
211212
throw new UnsupportedOperationException(getClass() + " does not support asynchronous operations.");
212213
}
213214

driver-core/src/test/functional/com/mongodb/connection/netty/NettyStreamSpecification.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ class NettyStreamSpecification extends Specification {
7676

7777
def stream = new NettyStreamFactory(SocketSettings.builder().connectTimeout(1000, TimeUnit.MILLISECONDS).build(),
7878
SslSettings.builder().build()).create(serverAddress)
79-
def callback = new CallbackErrorHolder()
79+
def callback = new CallbackErrorHolder().asCallback()
8080

8181
when:
8282
stream.openAsync(callback)

driver-core/src/test/functional/com/mongodb/internal/connection/AsyncSocketChannelStreamSpecification.groovy

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ class AsyncSocketChannelStreamSpecification extends Specification {
8383
def stream = new AsynchronousSocketChannelStream(serverAddress,
8484
SocketSettings.builder().connectTimeout(100, MILLISECONDS).build(),
8585
new PowerOfTwoBufferPool(), null)
86-
def callback = new CallbackErrorHolder()
86+
def callback = new CallbackErrorHolder().asCallback()
8787

8888
when:
8989
stream.openAsync(callback)

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/crypt/KeyManagementService.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ public void failed(final Throwable t) {
8585
stream.close();
8686
sink.error(t);
8787
}
88-
});
88+
}.asCallback());
8989
}).onErrorMap(this::unWrapException);
9090
}
9191

0 commit comments

Comments
 (0)