42
42
import com .mongodb .event .CommandListener ;
43
43
import com .mongodb .internal .ResourceUtil ;
44
44
import com .mongodb .internal .VisibleForTesting ;
45
+ import com .mongodb .internal .async .AsyncSupplier ;
45
46
import com .mongodb .internal .async .SingleResultCallback ;
46
47
import com .mongodb .internal .diagnostics .logging .Logger ;
47
48
import com .mongodb .internal .diagnostics .logging .Loggers ;
68
69
import java .util .function .Supplier ;
69
70
70
71
import static com .mongodb .assertions .Assertions .assertNotNull ;
72
+ import static com .mongodb .assertions .Assertions .assertNull ;
71
73
import static com .mongodb .assertions .Assertions .isTrue ;
72
74
import static com .mongodb .assertions .Assertions .notNull ;
75
+ import static com .mongodb .internal .async .AsyncRunnable .beginAsync ;
73
76
import static com .mongodb .internal .async .ErrorHandlingResultCallback .errorHandlingCallback ;
77
+ import static com .mongodb .internal .connection .Authenticator .shouldAuthenticate ;
74
78
import static com .mongodb .internal .connection .CommandHelper .HELLO ;
75
79
import static com .mongodb .internal .connection .CommandHelper .LEGACY_HELLO ;
76
80
import static com .mongodb .internal .connection .CommandHelper .LEGACY_HELLO_LOWER ;
@@ -238,7 +242,7 @@ public void open() {
238
242
239
243
@ Override
240
244
public void openAsync (final SingleResultCallback <Void > callback ) {
241
- isTrue ( "Open already called" , stream == null , callback );
245
+ assertNull ( stream );
242
246
try {
243
247
stream = streamFactory .create (serverId .getAddress ());
244
248
stream .openAsync (new AsyncCompletionHandler <Void >() {
@@ -364,17 +368,48 @@ public <T> T sendAndReceive(final CommandMessage message, final Decoder<T> decod
364
368
try {
365
369
return sendAndReceiveInternal .get ();
366
370
} catch (MongoCommandException e ) {
367
- if (triggersReauthentication (e ) && Authenticator .shouldAuthenticate (authenticator , this .description )) {
368
- authenticated .set (false );
369
- authenticator .reauthenticate (this );
370
- authenticated .set (true );
371
- return sendAndReceiveInternal .get ();
371
+ if (reauthenticationIsTriggered (e )) {
372
+ return reauthenticateAndRetry (sendAndReceiveInternal );
372
373
}
373
374
throw e ;
374
375
}
375
376
}
376
377
377
- public static boolean triggersReauthentication (@ Nullable final Throwable t ) {
378
+ @ Override
379
+ public <T > void sendAndReceiveAsync (final CommandMessage message , final Decoder <T > decoder , final SessionContext sessionContext ,
380
+ final RequestContext requestContext , final OperationContext operationContext , final SingleResultCallback <T > callback ) {
381
+
382
+ AsyncSupplier <T > sendAndReceiveAsyncInternal = c -> sendAndReceiveAsyncInternal (
383
+ message , decoder , sessionContext , requestContext , operationContext , c );
384
+ beginAsync ().<T >thenSupply (c -> {
385
+ sendAndReceiveAsyncInternal .getAsync (c );
386
+ }).onErrorIf (e -> reauthenticationIsTriggered (e ), c -> {
387
+ reauthenticateAndRetryAsync (sendAndReceiveAsyncInternal , c );
388
+ }).finish (callback );
389
+ }
390
+
391
+ private <T > T reauthenticateAndRetry (final Supplier <T > operation ) {
392
+ authenticated .set (false );
393
+ assertNotNull (authenticator ).reauthenticate (this );
394
+ authenticated .set (true );
395
+ return operation .get ();
396
+ }
397
+
398
+ private <T > void reauthenticateAndRetryAsync (final AsyncSupplier <T > operation ,
399
+ final SingleResultCallback <T > callback ) {
400
+ beginAsync ().thenRun (c -> {
401
+ authenticated .set (false );
402
+ assertNotNull (authenticator ).reauthenticateAsync (this , c );
403
+ }).<T >thenSupply ((c ) -> {
404
+ authenticated .set (true );
405
+ operation .getAsync (c );
406
+ }).finish (callback );
407
+ }
408
+
409
+ public boolean reauthenticationIsTriggered (@ Nullable final Throwable t ) {
410
+ if (!shouldAuthenticate (authenticator , this .description )) {
411
+ return false ;
412
+ }
378
413
if (t instanceof MongoCommandException ) {
379
414
MongoCommandException e = (MongoCommandException ) t ;
380
415
return e .getErrorCode () == 391 ;
@@ -501,11 +536,8 @@ private <T> T receiveCommandMessageResponse(final Decoder<T> decoder,
501
536
}
502
537
}
503
538
504
- @ Override
505
- public <T > void sendAndReceiveAsync (final CommandMessage message , final Decoder <T > decoder , final SessionContext sessionContext ,
539
+ private <T > void sendAndReceiveAsyncInternal (final CommandMessage message , final Decoder <T > decoder , final SessionContext sessionContext ,
506
540
final RequestContext requestContext , final OperationContext operationContext , final SingleResultCallback <T > callback ) {
507
- notNull ("stream is open" , stream , callback );
508
-
509
541
if (isClosed ()) {
510
542
callback .onResult (null , new MongoSocketClosedException ("Can not read from a closed socket" , getServerAddress ()));
511
543
return ;
@@ -616,7 +648,7 @@ public void sendMessage(final List<ByteBuf> byteBuffers, final int lastRequestId
616
648
617
649
@ Override
618
650
public ResponseBuffers receiveMessage (final int responseTo ) {
619
- notNull ( "stream is open" , stream );
651
+ assertNotNull ( stream );
620
652
if (isClosed ()) {
621
653
throw new MongoSocketClosedException ("Cannot read from a closed stream" , getServerAddress ());
622
654
}
@@ -634,8 +666,9 @@ private ResponseBuffers receiveMessageWithAdditionalTimeout(final int additional
634
666
}
635
667
636
668
@ Override
637
- public void sendMessageAsync (final List <ByteBuf > byteBuffers , final int lastRequestId , final SingleResultCallback <Void > callback ) {
638
- notNull ("stream is open" , stream , callback );
669
+ public void sendMessageAsync (final List <ByteBuf > byteBuffers , final int lastRequestId ,
670
+ final SingleResultCallback <Void > callback ) {
671
+ assertNotNull (stream );
639
672
640
673
if (isClosed ()) {
641
674
callback .onResult (null , new MongoSocketClosedException ("Can not read from a closed socket" , getServerAddress ()));
@@ -667,7 +700,7 @@ public void failed(final Throwable t) {
667
700
668
701
@ Override
669
702
public void receiveMessageAsync (final int responseTo , final SingleResultCallback <ResponseBuffers > callback ) {
670
- isTrue ( " stream is open" , stream != null , callback );
703
+ assertNotNull ( stream );
671
704
672
705
if (isClosed ()) {
673
706
callback .onResult (null , new MongoSocketClosedException ("Can not read from a closed socket" , getServerAddress ()));
0 commit comments