Skip to content

Commit 8ffbef4

Browse files
committed
Simpler fix at the flux level only
1 parent 6637be8 commit 8ffbef4

File tree

2 files changed

+12
-14
lines changed

2 files changed

+12
-14
lines changed

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursor.java

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,21 @@
2121

2222
import java.util.List;
2323

24+
import static com.mongodb.reactivestreams.client.internal.MongoOperationPublisher.sinkToCallback;
25+
2426
/**
2527
* <p>This class is not part of the public API and may be removed or changed at any time</p>
2628
*/
2729
public class BatchCursor<T> implements AutoCloseable {
2830

2931
private final AsyncBatchCursor<T> wrapped;
30-
private volatile boolean cursorClosed = false;
3132

3233
public BatchCursor(final AsyncBatchCursor<T> wrapped) {
3334
this.wrapped = wrapped;
3435
}
3536

3637
public Publisher<List<T>> next() {
37-
return Mono.create(sink -> wrapped.next(
38-
(result, t) -> {
39-
if (t != null && !cursorClosed) {
40-
sink.error(t);
41-
} else if (result == null) {
42-
sink.success();
43-
} else {
44-
sink.success(result);
45-
}
46-
}));
38+
return Mono.create(sink -> wrapped.next(sinkToCallback(sink)));
4739
}
4840

4941
public void setBatchSize(final int batchSize) {
@@ -59,7 +51,6 @@ public boolean isClosed() {
5951
}
6052

6153
public void close() {
62-
cursorClosed = true;
6354
wrapped.close();
6455
}
6556

driver-reactive-streams/src/main/com/mongodb/reactivestreams/client/internal/BatchCursorFlux.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@ class BatchCursorFlux<T> implements Publisher<T> {
3232
private final BatchCursorPublisher<T> batchCursorPublisher;
3333
private final AtomicBoolean inProgress = new AtomicBoolean(false);
3434
private final AtomicLong demandDelta = new AtomicLong(0);
35+
private final AtomicBoolean fluxCancelled = new AtomicBoolean(false);
3536
private volatile BatchCursor<T> batchCursor;
3637
private FluxSink<T> sink;
3738

@@ -72,6 +73,11 @@ public void subscribe(final Subscriber<? super T> subscriber) {
7273
.subscribe(subscriber);
7374
}
7475

76+
private void cancelled() {
77+
fluxCancelled.set(true);
78+
closeCursor();
79+
}
80+
7581
private void closeCursor() {
7682
if (batchCursor != null) {
7783
batchCursor.close();
@@ -85,12 +91,13 @@ private void recurseCursor(){
8591
} else {
8692
batchCursor.setBatchSize(calculateBatchSize(sink.requestedFromDownstream()));
8793
Mono.from(batchCursor.next())
88-
.doOnCancel(this::closeCursor)
8994
.doOnError((e) -> {
9095
try {
9196
closeCursor();
9297
} finally {
93-
sink.error(e);
98+
if (!fluxCancelled.get()) {
99+
sink.error(e);
100+
}
94101
}
95102
})
96103
.doOnSuccess(results -> {

0 commit comments

Comments
 (0)