Skip to content

Commit 9b8ac4a

Browse files
committed
Add derivation test, fix bugs and excess nested code
1 parent 34bfddb commit 9b8ac4a

File tree

3 files changed

+115
-16
lines changed

3 files changed

+115
-16
lines changed

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnection.java

+12-14
Original file line numberDiff line numberDiff line change
@@ -229,20 +229,18 @@ public void open() {
229229

230230
@Override
231231
public void openAsync(final SingleResultCallback<Void> callback) {
232-
beginAsync().thenRun(
233-
beginAsync().thenRun(c -> {
234-
isTrue("Open already called", stream == null, callback);
235-
stream = streamFactory.create(getServerAddressWithResolver());
236-
stream.openAsync(c);
237-
}).<InternalConnectionInitializationDescription>thenSupply(c -> {
238-
connectionInitializer.startHandshakeAsync(this, c);
239-
}).<InternalConnectionInitializationDescription>thenApply((initializationDescription, c) -> {
240-
initAfterHandshakeStart(assertNotNull(initializationDescription));
241-
connectionInitializer.finishHandshakeAsync(this, initializationDescription, c);
242-
}).thenConsume((initializationDescription, c) -> {
243-
initAfterHandshakeFinish(assertNotNull(initializationDescription));
244-
})
245-
).onErrorIf(t -> true, (t, c) -> {
232+
beginAsync().thenRun(c -> {
233+
isTrue("Open already called", stream == null, callback);
234+
stream = streamFactory.create(getServerAddressWithResolver());
235+
stream.openAsync(c);
236+
}).<InternalConnectionInitializationDescription>thenSupply(c -> {
237+
connectionInitializer.startHandshakeAsync(this, c);
238+
}).<InternalConnectionInitializationDescription>thenApply((initializationDescription, c) -> {
239+
initAfterHandshakeStart(initializationDescription);
240+
connectionInitializer.finishHandshakeAsync(this, initializationDescription, c);
241+
}).thenConsume((initializationDescription, c) -> {
242+
initAfterHandshakeFinish(initializationDescription);
243+
}).onErrorIf(t -> true, (t, c) -> {
246244
close();
247245
if (t instanceof MongoException) {
248246
throw (MongoException) t;

driver-core/src/main/com/mongodb/internal/connection/InternalStreamConnectionInitializer.java

+1-1
Original file line numberDiff line numberDiff line change
@@ -112,7 +112,7 @@ public void startHandshakeAsync(final InternalConnection internalConnection,
112112
throw mapHelloException((MongoException) assertNotNull(t));
113113
}).thenApply((helloResult, c2) -> {
114114
setSpeculativeAuthenticateResponse(assertNotNull(helloResult));
115-
callback.complete(createInitializationDescription(helloResult, internalConnection, startTime));
115+
c2.complete(createInitializationDescription(helloResult, internalConnection, startTime));
116116
});
117117
}).finish(callback);
118118
}

driver-core/src/test/unit/com/mongodb/internal/async/AsyncFunctionsTest.java

+102-1
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.List;
2323
import java.util.concurrent.atomic.AtomicBoolean;
2424
import java.util.concurrent.atomic.AtomicReference;
25+
import java.util.function.BiConsumer;
2526
import java.util.function.Consumer;
2627
import java.util.function.Supplier;
2728

@@ -115,6 +116,15 @@ below, SHOULD be used (for consistency, and ease of comparison/review).
115116
async(1, c);
116117
}).finish(callback);
117118
});
119+
/*
120+
Code review checklist for async code:
121+
122+
1. Is everything inside the boilerplate?
123+
2. Is "callback" supplied to "finish"?
124+
3. In each block and nested block, is that same block's "c" always passed/completed at the end of execution?
125+
4. Is any c.complete followed by a return, to end execution?
126+
5. Do any sync methods still need to be converted to async?
127+
*/
118128
}
119129

120130
@Test
@@ -610,10 +620,11 @@ void testTryCatchWithVariables() {
610620

611621
@Test
612622
void testTryCatchWithConditionInCatch() {
613-
assertBehavesSameVariations(8,
623+
assertBehavesSameVariations(12,
614624
() -> {
615625
try {
616626
sync(plainTest(0) ? 1 : 2);
627+
sync(3);
617628
} catch (Throwable t) {
618629
sync(5);
619630
if (t.getMessage().equals("exception-1")) {
@@ -626,6 +637,8 @@ void testTryCatchWithConditionInCatch() {
626637
(callback) -> {
627638
beginAsync().thenRun(c -> {
628639
async(plainTest(0) ? 1 : 2, c);
640+
}).thenRun(c -> {
641+
async(3, c);
629642
}).onErrorIf(t -> true, (t, c) -> {
630643
beginAsync().thenRun(c2 -> {
631644
async(5, c2);
@@ -793,6 +806,94 @@ void testInvalid() {
793806
});
794807
}
795808

809+
@Test
810+
void testDerivation() {
811+
// Demonstrates the progression from nested async to the API.
812+
813+
// Stand-ins for sync-async methods; these "happily" do not throw
814+
// exceptions, to avoid complicating this demo async code.
815+
Consumer<Integer> happySync = (i) -> {
816+
invocationTracker.getNextOption(1);
817+
listener.add("affected-success-" + i);
818+
};
819+
BiConsumer<Integer, SingleResultCallback<Void>> happyAsync = (i, c) -> {
820+
happySync.accept(i);
821+
c.complete();
822+
};
823+
824+
// Standard nested async, no error handling:
825+
assertBehavesSameVariations(1,
826+
() -> {
827+
happySync.accept(1);
828+
happySync.accept(2);
829+
},
830+
(callback) -> {
831+
happyAsync.accept(1, (v, e) -> {
832+
happyAsync.accept(2, callback);
833+
});
834+
});
835+
836+
// When both methods are naively extracted, they are out of order:
837+
assertBehavesSameVariations(1,
838+
() -> {
839+
happySync.accept(1);
840+
happySync.accept(2);
841+
},
842+
(callback) -> {
843+
SingleResultCallback<Void> second = (v, e) -> {
844+
happyAsync.accept(2, callback);
845+
};
846+
SingleResultCallback<Void> first = (v, e) -> {
847+
happyAsync.accept(1, second);
848+
};
849+
first.onResult(null, null);
850+
});
851+
852+
// We create an "AsyncRunnable" that takes a callback, which
853+
// decouples any async methods from each other, allowing them
854+
// to be declared in a sync-like order, and without nesting:
855+
assertBehavesSameVariations(1,
856+
() -> {
857+
happySync.accept(1);
858+
happySync.accept(2);
859+
},
860+
(callback) -> {
861+
AsyncRunnable first = (SingleResultCallback<Void> c) -> {
862+
happyAsync.accept(1, c);
863+
};
864+
AsyncRunnable second = (SingleResultCallback<Void> c) -> {
865+
happyAsync.accept(2, c);
866+
};
867+
// This is a simplified variant of the "then" methods;
868+
// it has no error handling. It takes methods A and B,
869+
// and returns C, which is B(A()).
870+
AsyncRunnable combined = (c) -> {
871+
first.unsafeFinish((r, e) -> {
872+
second.unsafeFinish(c);
873+
});
874+
};
875+
combined.unsafeFinish(callback);
876+
});
877+
878+
// This combining method is added as a default method on AsyncRunnable,
879+
// and a "finish" method wraps the resulting methods. This also adds
880+
// exception handling and monadic short-circuiting of ensuing methods
881+
// when an exception arises (comparable to how thrown exceptions "skip"
882+
// ensuing code).
883+
assertBehavesSameVariations(3,
884+
() -> {
885+
sync(1);
886+
sync(2);
887+
},
888+
(callback) -> {
889+
beginAsync().thenRun(c -> {
890+
async(1, c);
891+
}).thenRun(c -> {
892+
async(2, c);
893+
}).finish(callback);
894+
});
895+
}
896+
796897
// invoked methods:
797898

798899
private void plain(final int i) {

0 commit comments

Comments
 (0)