Skip to content

Commit b13a45e

Browse files
authored
2.x: Fix toFlowable(ERROR) not cancelling upon MBE (#7084)
1 parent 2cb20bd commit b13a45e

File tree

2 files changed

+22
-0
lines changed

2 files changed

+22
-0
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureError.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public void onNext(T t) {
6565
downstream.onNext(t);
6666
BackpressureHelper.produced(this, 1);
6767
} else {
68+
upstream.cancel();
6869
onError(new MissingBackpressureException("could not emit value due to lack of requests"));
6970
}
7071
}

src/test/java/io/reactivex/internal/operators/flowable/FlowableOnBackpressureErrorTest.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,11 +13,16 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import static org.junit.Assert.*;
17+
1618
import org.junit.Test;
1719
import org.reactivestreams.Publisher;
1820

1921
import io.reactivex.*;
22+
import io.reactivex.exceptions.MissingBackpressureException;
2023
import io.reactivex.functions.Function;
24+
import io.reactivex.subjects.PublishSubject;
25+
import io.reactivex.subscribers.TestSubscriber;
2126

2227
public class FlowableOnBackpressureErrorTest {
2328

@@ -50,4 +55,20 @@ public Object apply(Flowable<Integer> f) throws Exception {
5055
}
5156
}, false, 1, 1, 1);
5257
}
58+
59+
@Test
60+
public void overflowCancels() {
61+
PublishSubject<Integer> ps = PublishSubject.create();
62+
63+
TestSubscriber<Integer> ts = ps.toFlowable(BackpressureStrategy.ERROR)
64+
.test(0L);
65+
66+
assertTrue(ps.hasObservers());
67+
68+
ps.onNext(1);
69+
70+
assertFalse(ps.hasObservers());
71+
72+
ts.assertFailure(MissingBackpressureException.class);
73+
}
5374
}

0 commit comments

Comments
 (0)