Skip to content

Commit 28a28f7

Browse files
jyeminvbabanin
andauthored
Add BatchCursor interceptor in reactive tests (#1390)
JAVA-5356 --------- Co-authored-by: slav.babanin <[email protected]>
1 parent 5c37b88 commit 28a28f7

File tree

3 files changed

+113
-4
lines changed

3 files changed

+113
-4
lines changed

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoClient.java

+23
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,7 @@
2424
import com.mongodb.client.MongoDatabase;
2525
import com.mongodb.client.MongoIterable;
2626
import com.mongodb.connection.ClusterDescription;
27+
import com.mongodb.reactivestreams.client.internal.BatchCursor;
2728
import org.bson.BsonDocument;
2829
import org.bson.Document;
2930
import org.bson.conversions.Bson;
@@ -41,6 +42,7 @@ public class SyncMongoClient implements MongoClient {
4142

4243
private static long sleepAfterCursorCloseMS;
4344
private static long sleepAfterSessionCloseMS;
45+
private static boolean waitForBatchCursorCreation;
4446

4547
/**
4648
* Unfortunately this is the only way to wait for a query to be initiated, since Reactive Streams is asynchronous
@@ -88,6 +90,27 @@ public static void enableSleepAfterSessionClose(final long sleepMS) {
8890
sleepAfterSessionCloseMS = sleepMS;
8991
}
9092

93+
/**
94+
* Enables behavior for waiting until a reactive {@link BatchCursor} is created.
95+
* <p>
96+
* When enabled, {@link SyncMongoCursor} allows intercepting the result of the cursor creation process.
97+
* If the creation fails, the resulting exception will be propagated; if successful, the
98+
* process will proceed to issue getMore commands.
99+
* <p>
100+
* NOTE: Do not enable when multiple cursors are being iterated concurrently.
101+
*/
102+
public static void enableWaitForBatchCursorCreation() {
103+
waitForBatchCursorCreation = true;
104+
}
105+
106+
public static boolean isWaitForBatchCursorCreationEnabled() {
107+
return waitForBatchCursorCreation;
108+
}
109+
110+
public static void disableWaitForBatchCursorCreation() {
111+
waitForBatchCursorCreation = false;
112+
}
113+
91114
public static void disableSleep() {
92115
sleepAfterCursorOpenMS = 0;
93116
sleepAfterCursorCloseMS = 0;

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/syncadapter/SyncMongoCursor.java

+74
Original file line numberDiff line numberDiff line change
@@ -21,26 +21,36 @@
2121
import com.mongodb.ServerCursor;
2222
import com.mongodb.client.MongoCursor;
2323
import com.mongodb.lang.Nullable;
24+
import com.mongodb.reactivestreams.client.internal.BatchCursor;
2425
import org.reactivestreams.Publisher;
2526
import org.reactivestreams.Subscriber;
2627
import org.reactivestreams.Subscription;
28+
import reactor.core.CoreSubscriber;
2729
import reactor.core.publisher.Flux;
30+
import reactor.core.publisher.Hooks;
31+
import reactor.core.publisher.Operators;
32+
import reactor.util.context.Context;
2833

2934
import java.util.NoSuchElementException;
3035
import java.util.concurrent.BlockingDeque;
36+
import java.util.concurrent.CompletableFuture;
3137
import java.util.concurrent.CountDownLatch;
38+
import java.util.concurrent.ExecutionException;
3239
import java.util.concurrent.LinkedBlockingDeque;
3340
import java.util.concurrent.TimeUnit;
41+
import java.util.concurrent.TimeoutException;
3442

3543
import static com.mongodb.ClusterFixture.TIMEOUT;
3644
import static com.mongodb.internal.thread.InterruptionUtil.interruptAndCreateMongoInterruptedException;
3745
import static com.mongodb.reactivestreams.client.syncadapter.ContextHelper.CONTEXT;
3846
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterCursorClose;
3947
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.getSleepAfterCursorOpen;
48+
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.isWaitForBatchCursorCreationEnabled;
4049

4150
class SyncMongoCursor<T> implements MongoCursor<T> {
4251
private static final Object COMPLETED = new Object();
4352
private final BlockingDeque<Object> results = new LinkedBlockingDeque<>();
53+
private final CompletableFuture<Object> batchCursorCompletableFuture = new CompletableFuture<>();
4454
private final Integer batchSize;
4555
private int countToBatchSize;
4656
private Subscription subscription;
@@ -51,6 +61,15 @@ class SyncMongoCursor<T> implements MongoCursor<T> {
5161
SyncMongoCursor(final Publisher<T> publisher, @Nullable final Integer batchSize) {
5262
this.batchSize = batchSize;
5363
CountDownLatch latch = new CountDownLatch(1);
64+
65+
if (isWaitForBatchCursorCreationEnabled()) {
66+
// This hook allows us to intercept the `onNext` and `onError` signals for any operation to determine
67+
// whether the {@link BatchCursor} was created successfully or if an error occurred during its creation process.
68+
// The result is propagated to a {@link CompletableFuture}, which we use to block until it is completed.
69+
Hooks.onEachOperator(Operators.lift((sc, sub) ->
70+
new BatchCursorInterceptSubscriber(sub, batchCursorCompletableFuture)));
71+
}
72+
5473
//noinspection ReactiveStreamsSubscriberImplementation
5574
Flux.from(publisher).contextWrite(CONTEXT).subscribe(new Subscriber<T>() {
5675
@Override
@@ -83,9 +102,19 @@ public void onComplete() {
83102
if (!latch.await(TIMEOUT, TimeUnit.SECONDS)) {
84103
throw new MongoTimeoutException("Timeout waiting for subscription");
85104
}
105+
if (isWaitForBatchCursorCreationEnabled()) {
106+
batchCursorCompletableFuture.get(TIMEOUT, TimeUnit.SECONDS);
107+
Hooks.resetOnEachOperator();
108+
}
86109
sleep(getSleepAfterCursorOpen());
87110
} catch (InterruptedException e) {
88111
throw interruptAndCreateMongoInterruptedException("Interrupted waiting for asynchronous cursor establishment", e);
112+
} catch (ExecutionException | TimeoutException e) {
113+
Throwable cause = e.getCause();
114+
if (cause instanceof RuntimeException) {
115+
throw (RuntimeException) cause;
116+
}
117+
throw new RuntimeException(e);
89118
}
90119
}
91120

@@ -181,4 +210,49 @@ private RuntimeException translateError(final Throwable throwable) {
181210
}
182211
return new RuntimeException(throwable);
183212
}
213+
214+
215+
private static final class BatchCursorInterceptSubscriber implements CoreSubscriber<Object> {
216+
217+
private final CoreSubscriber<Object> sub;
218+
private final CompletableFuture<Object> batchCursorCompletableFuture;
219+
220+
BatchCursorInterceptSubscriber(final CoreSubscriber<Object> sub,
221+
final CompletableFuture<Object> batchCursorCompletableFuture) {
222+
this.sub = sub;
223+
this.batchCursorCompletableFuture = batchCursorCompletableFuture;
224+
}
225+
226+
@Override
227+
public Context currentContext() {
228+
return sub.currentContext();
229+
}
230+
231+
@Override
232+
public void onSubscribe(final Subscription s) {
233+
sub.onSubscribe(s);
234+
}
235+
236+
@Override
237+
public void onNext(final Object o) {
238+
if (o instanceof BatchCursor) {
239+
// Interception of a cursor means that it has been created at this point.
240+
batchCursorCompletableFuture.complete(o);
241+
}
242+
sub.onNext(o);
243+
}
244+
245+
@Override
246+
public void onError(final Throwable t) {
247+
if (!batchCursorCompletableFuture.isDone()) { // Cursor has not been created yet but an error occurred.
248+
batchCursorCompletableFuture.completeExceptionally(t);
249+
}
250+
sub.onError(t);
251+
}
252+
253+
@Override
254+
public void onComplete() {
255+
sub.onComplete();
256+
}
257+
}
184258
}

driver-reactive-streams/src/test/functional/com/mongodb/reactivestreams/client/unified/ChangeStreamsTest.java

+16-4
Original file line numberDiff line numberDiff line change
@@ -29,17 +29,16 @@
2929
import java.util.List;
3030

3131
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.disableSleep;
32+
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.disableWaitForBatchCursorCreation;
3233
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.enableSleepAfterCursorOpen;
34+
import static com.mongodb.reactivestreams.client.syncadapter.SyncMongoClient.enableWaitForBatchCursorCreation;
3335
import static org.junit.Assume.assumeFalse;
3436

3537
public final class ChangeStreamsTest extends UnifiedReactiveStreamsTest {
3638

3739
private static final List<String> ERROR_REQUIRED_FROM_CHANGE_STREAM_INITIALIZATION_TESTS =
3840
Arrays.asList(
39-
"Test with document comment - pre 4.4",
40-
"Change Stream should error when an invalid aggregation stage is passed in",
41-
"The watch helper must not throw a custom exception when executed against a single server topology, "
42-
+ "but instead depend on a server error"
41+
"Test with document comment - pre 4.4"
4342
);
4443

4544
private static final List<String> EVENT_SENSITIVE_TESTS =
@@ -48,6 +47,14 @@ public final class ChangeStreamsTest extends UnifiedReactiveStreamsTest {
4847
"Test that comment is not set on getMore - pre 4.4"
4948
);
5049

50+
private static final List<String> REQUIRES_BATCH_CURSOR_CREATION_WAITING =
51+
Arrays.asList(
52+
"Change Stream should error when an invalid aggregation stage is passed in",
53+
"The watch helper must not throw a custom exception when executed against a single server topology, "
54+
+ "but instead depend on a server error"
55+
);
56+
57+
5158
public ChangeStreamsTest(@SuppressWarnings("unused") final String fileDescription,
5259
@SuppressWarnings("unused") final String testDescription,
5360
final String schemaVersion, @Nullable final BsonArray runOnRequirements, final BsonArray entities,
@@ -58,12 +65,17 @@ public ChangeStreamsTest(@SuppressWarnings("unused") final String fileDescriptio
5865
assumeFalse(EVENT_SENSITIVE_TESTS.contains(testDescription));
5966

6067
enableSleepAfterCursorOpen(256);
68+
69+
if (REQUIRES_BATCH_CURSOR_CREATION_WAITING.contains(testDescription)) {
70+
enableWaitForBatchCursorCreation();
71+
}
6172
}
6273

6374
@After
6475
public void cleanUp() {
6576
super.cleanUp();
6677
disableSleep();
78+
disableWaitForBatchCursorCreation();
6779
}
6880

6981
@Parameterized.Parameters(name = "{0}: {1}")

0 commit comments

Comments
 (0)