Skip to content

Commit 2edba23

Browse files
authored
3.x: Standardize MissingBackpressureException message, introduce QueueOverflowException (#7459)
* 3.x: Standardize MissingBackpressureException messages * Use the correct message. * Fix tests * Add QueueOverflowException, fix tests * Update CompletableConcatTest.java
1 parent ef51a90 commit 2edba23

File tree

61 files changed

+164
-94
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

61 files changed

+164
-94
lines changed

src/main/java/io/reactivex/rxjava3/exceptions/MissingBackpressureException.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,15 @@ public final class MissingBackpressureException extends RuntimeException {
2020

2121
private static final long serialVersionUID = 8517344746016032542L;
2222

23+
/**
24+
* The default error message.
25+
* <p>
26+
* This can happen if the downstream doesn't call {@link org.reactivestreams.Subscription#request(long)}
27+
* in time or at all.
28+
* @since 3.1.6
29+
*/
30+
public static final String DEFAULT_MESSAGE = "Could not emit value due to lack of requests";
31+
2332
/**
2433
* Constructs a MissingBackpressureException without message or cause.
2534
*/
@@ -35,4 +44,13 @@ public MissingBackpressureException(String message) {
3544
super(message);
3645
}
3746

47+
/**
48+
* Constructs a new {@code MissingBackpressureException} with the
49+
* default message {@value #DEFAULT_MESSAGE}.
50+
* @return the new {@code MissingBackpressureException} instance.
51+
* @since 3.1.6
52+
*/
53+
public static MissingBackpressureException createDefault() {
54+
return new MissingBackpressureException(DEFAULT_MESSAGE);
55+
}
3856
}
Lines changed: 49 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,49 @@
1+
/*
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.exceptions;
15+
16+
/**
17+
* Indicates an overflow happened because the upstream disregarded backpressure completely or
18+
* {@link org.reactivestreams.Subscriber#onNext(Object)} was called concurrently from multiple threads
19+
* without synchronization. Rarely, it is an indication of bugs inside an operator.
20+
* @since 3.1.6
21+
*/
22+
public final class QueueOverflowException extends RuntimeException {
23+
24+
private static final long serialVersionUID = 8517344746016032542L;
25+
26+
/**
27+
* The message for queue overflows.
28+
* <p>
29+
* This can happen if the upstream disregards backpressure completely or calls
30+
* {@link org.reactivestreams.Subscriber#onNext(Object)} concurrently from multiple threads
31+
* without synchronization. Rarely, it is an indication of bugs inside an operator.
32+
*/
33+
private static final String DEFAULT_MESSAGE = "Queue overflow due to illegal concurrent onNext calls or a bug in an operator";
34+
35+
/**
36+
* Constructs a QueueOverflowException with the default message.
37+
*/
38+
public QueueOverflowException() {
39+
this(DEFAULT_MESSAGE);
40+
}
41+
42+
/**
43+
* Constructs a QueueOverflowException with the given message but no cause.
44+
* @param message the error message
45+
*/
46+
public QueueOverflowException(String message) {
47+
super(message);
48+
}
49+
}

src/main/java/io/reactivex/rxjava3/internal/jdk8/FlowableFlatMapStream.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -174,7 +174,7 @@ public void onNext(T t) {
174174
if (sourceMode != QueueFuseable.ASYNC) {
175175
if (!queue.offer(t)) {
176176
upstream.cancel();
177-
onError(new MissingBackpressureException("Queue full?!"));
177+
onError(new QueueOverflowException());
178178
return;
179179
}
180180
}

src/main/java/io/reactivex/rxjava3/internal/operators/completable/CompletableConcat.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void onSubscribe(Subscription s) {
120120
public void onNext(CompletableSource t) {
121121
if (sourceFused == QueueSubscription.NONE) {
122122
if (!queue.offer(t)) {
123-
onError(new MissingBackpressureException());
123+
onError(new QueueOverflowException());
124124
return;
125125
}
126126
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/BlockingFlowableIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public void onNext(T t) {
140140
if (!queue.offer(t)) {
141141
SubscriptionHelper.cancel(this);
142142

143-
onError(new MissingBackpressureException("Queue full?!"));
143+
onError(new QueueOverflowException());
144144
} else {
145145
signalConsumer();
146146
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMap.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.reactivestreams.*;
2020

2121
import io.reactivex.rxjava3.core.*;
22-
import io.reactivex.rxjava3.exceptions.Exceptions;
22+
import io.reactivex.rxjava3.exceptions.*;
2323
import io.reactivex.rxjava3.functions.*;
2424
import io.reactivex.rxjava3.internal.subscriptions.*;
2525
import io.reactivex.rxjava3.internal.util.*;
@@ -152,7 +152,7 @@ public final void onNext(T t) {
152152
if (sourceMode != QueueSubscription.ASYNC) {
153153
if (!queue.offer(t)) {
154154
upstream.cancel();
155-
onError(new IllegalStateException("Queue full?!"));
155+
onError(new QueueOverflowException());
156156
return;
157157
}
158158
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapEager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -200,7 +200,7 @@ public void innerNext(InnerQueuedSubscriber<R> inner, R value) {
200200
drain();
201201
} else {
202202
inner.cancel();
203-
innerError(inner, new MissingBackpressureException());
203+
innerError(inner, MissingBackpressureException.createDefault());
204204
}
205205
}
206206

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableConcatMapScheduler.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import org.reactivestreams.*;
2020

2121
import io.reactivex.rxjava3.core.*;
22-
import io.reactivex.rxjava3.exceptions.Exceptions;
22+
import io.reactivex.rxjava3.exceptions.*;
2323
import io.reactivex.rxjava3.functions.*;
2424
import io.reactivex.rxjava3.internal.operators.flowable.FlowableConcatMap.*;
2525
import io.reactivex.rxjava3.internal.subscriptions.SubscriptionHelper;
@@ -151,7 +151,7 @@ public final void onNext(T t) {
151151
if (sourceMode != QueueSubscription.ASYNC) {
152152
if (!queue.offer(t)) {
153153
upstream.cancel();
154-
onError(new IllegalStateException("Queue full?!"));
154+
onError(new QueueOverflowException());
155155
return;
156156
}
157157
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableCreate.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -442,7 +442,7 @@ static final class ErrorAsyncEmitter<T> extends NoOverflowBaseAsyncEmitter<T> {
442442

443443
@Override
444444
void onOverflow() {
445-
onError(new MissingBackpressureException("create: could not emit value due to lack of requests"));
445+
onError(new MissingBackpressureException("create: " + MissingBackpressureException.DEFAULT_MESSAGE));
446446
}
447447

448448
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounce.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ void emit(long idx, T value) {
148148
BackpressureHelper.produced(this, 1);
149149
} else {
150150
cancel();
151-
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
151+
downstream.onError(MissingBackpressureException.createDefault());
152152
}
153153
}
154154
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableDebounceTimed.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -159,7 +159,7 @@ void emit(long idx, T t, DebounceEmitter<T> emitter) {
159159
emitter.dispose();
160160
} else {
161161
cancel();
162-
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
162+
downstream.onError(MissingBackpressureException.createDefault());
163163
}
164164
}
165165
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlatMap.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -243,7 +243,7 @@ void tryEmitScalar(U value) {
243243
q = getMainQueue();
244244
}
245245
if (!q.offer(value)) {
246-
onError(new MissingBackpressureException("Scalar queue full?!"));
246+
onError(new QueueOverflowException());
247247
}
248248
}
249249
if (decrementAndGet() == 0) {
@@ -252,7 +252,7 @@ void tryEmitScalar(U value) {
252252
} else {
253253
SimpleQueue<U> q = getMainQueue();
254254
if (!q.offer(value)) {
255-
onError(new MissingBackpressureException("Scalar queue full?!"));
255+
onError(new QueueOverflowException());
256256
return;
257257
}
258258
if (getAndIncrement() != 0) {
@@ -278,7 +278,7 @@ void tryEmit(U value, InnerSubscriber<T, U> inner) {
278278
inner.queue = q;
279279
}
280280
if (!q.offer(value)) {
281-
onError(new MissingBackpressureException("Inner queue full?!"));
281+
onError(new QueueOverflowException());
282282
}
283283
}
284284
if (decrementAndGet() == 0) {
@@ -291,7 +291,7 @@ void tryEmit(U value, InnerSubscriber<T, U> inner) {
291291
inner.queue = q;
292292
}
293293
if (!q.offer(value)) {
294-
onError(new MissingBackpressureException("Inner queue full?!"));
294+
onError(new QueueOverflowException());
295295
return;
296296
}
297297
if (getAndIncrement() != 0) {

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableFlattenIterable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ public void onNext(T t) {
180180
return;
181181
}
182182
if (fusionMode == NONE && !queue.offer(t)) {
183-
onError(new MissingBackpressureException("Queue is full?!"));
183+
onError(new QueueOverflowException());
184184
return;
185185
}
186186
drain();

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupBy.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ public void onNext(T t) {
168168
if (emittedGroups != get()) {
169169
downstream.onNext(group);
170170
} else {
171-
MissingBackpressureException mbe = new MissingBackpressureException(groupHangWarning(emittedGroups));
171+
MissingBackpressureException mbe = groupHangWarning(emittedGroups);
172172
mbe.initCause(ex);
173173
onError(mbe);
174174
return;
@@ -194,13 +194,13 @@ public void onNext(T t) {
194194
}
195195
} else {
196196
upstream.cancel();
197-
onError(new MissingBackpressureException(groupHangWarning(emittedGroups)));
197+
onError(groupHangWarning(emittedGroups));
198198
}
199199
}
200200
}
201201

202-
static String groupHangWarning(long n) {
203-
return "Unable to emit a new group (#" + n + ") due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.";
202+
static MissingBackpressureException groupHangWarning(long n) {
203+
return new MissingBackpressureException("Unable to emit a new group (#" + n + ") due to lack of requests. Please make sure the downstream can always accept a new group as well as each group is consumed in order for the whole operator to be able to proceed.");
204204
}
205205

206206
@Override

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableGroupJoin.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -276,7 +276,7 @@ void drain() {
276276
a.onNext(w);
277277
BackpressureHelper.produced(requested, 1);
278278
} else {
279-
fail(new MissingBackpressureException("Could not emit value due to lack of requests"), a, q);
279+
fail(MissingBackpressureException.createDefault(), a, q);
280280
return;
281281
}
282282

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableInterval.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ public void run() {
9393
downstream.onNext(count++);
9494
BackpressureHelper.produced(this, 1);
9595
} else {
96-
downstream.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
96+
downstream.onError(new MissingBackpressureException("Could not emit value " + count + " due to lack of requests"));
9797
DisposableHelper.dispose(resource);
9898
}
9999
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableIntervalRange.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ public void run() {
114114
decrementAndGet();
115115
}
116116
} else {
117-
downstream.onError(new MissingBackpressureException("Can't deliver value " + count + " due to lack of requests"));
117+
downstream.onError(new MissingBackpressureException("Could not emit value " + count + " due to lack of requests"));
118118
DisposableHelper.dispose(resource);
119119
}
120120
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableJoin.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -260,7 +260,7 @@ void drain() {
260260

261261
e++;
262262
} else {
263-
ExceptionHelper.addThrowable(error, new MissingBackpressureException("Could not emit value due to lack of requests"));
263+
ExceptionHelper.addThrowable(error, MissingBackpressureException.createDefault());
264264
q.clear();
265265
cancelAll();
266266
errorAll(a);
@@ -321,7 +321,7 @@ else if (mode == RIGHT_VALUE) {
321321

322322
e++;
323323
} else {
324-
ExceptionHelper.addThrowable(error, new MissingBackpressureException("Could not emit value due to lack of requests"));
324+
ExceptionHelper.addThrowable(error, MissingBackpressureException.createDefault());
325325
q.clear();
326326
cancelAll();
327327
errorAll(a);

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableObserveOn.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -113,7 +113,7 @@ public final void onNext(T t) {
113113
if (!queue.offer(t)) {
114114
upstream.cancel();
115115

116-
error = new MissingBackpressureException("Queue is full?!");
116+
error = new QueueOverflowException();
117117
done = true;
118118
}
119119
trySchedule();

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureBufferStrategy.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -140,7 +140,7 @@ public void onNext(T t) {
140140
}
141141
} else if (callError) {
142142
upstream.cancel();
143-
onError(new MissingBackpressureException());
143+
onError(MissingBackpressureException.createDefault());
144144
} else {
145145
drain();
146146
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableOnBackpressureError.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ public void onNext(T t) {
6666
BackpressureHelper.produced(this, 1);
6767
} else {
6868
upstream.cancel();
69-
onError(new MissingBackpressureException("could not emit value due to lack of requests"));
69+
onError(MissingBackpressureException.createDefault());
7070
}
7171
}
7272

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublish.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -226,7 +226,7 @@ public void onSubscribe(Subscription s) {
226226
public void onNext(T t) {
227227
// we expect upstream to honor backpressure requests
228228
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
229-
onError(new MissingBackpressureException("Prefetch queue is full?!"));
229+
onError(new QueueOverflowException());
230230
return;
231231
}
232232
// since many things can happen concurrently, we have a common dispatch

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowablePublishMulticast.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -212,7 +212,7 @@ public void onNext(T t) {
212212
}
213213
if (sourceMode == QueueSubscription.NONE && !queue.offer(t)) {
214214
upstream.get().cancel();
215-
onError(new MissingBackpressureException());
215+
onError(MissingBackpressureException.createDefault());
216216
return;
217217
}
218218
drain();

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSamplePublisher.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ void emit() {
129129
BackpressureHelper.produced(requested, 1);
130130
} else {
131131
cancel();
132-
downstream.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
132+
downstream.onError(MissingBackpressureException.createDefault());
133133
}
134134
}
135135
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSampleTimed.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ void emit() {
125125
BackpressureHelper.produced(requested, 1);
126126
} else {
127127
cancel();
128-
downstream.onError(new MissingBackpressureException("Couldn't emit value due to lack of requests!"));
128+
downstream.onError(MissingBackpressureException.createDefault());
129129
}
130130
}
131131
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSequenceEqual.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -300,7 +300,7 @@ public void onSubscribe(Subscription s) {
300300
public void onNext(T t) {
301301
if (sourceMode == QueueSubscription.NONE) {
302302
if (!queue.offer(t)) {
303-
onError(new MissingBackpressureException());
303+
onError(MissingBackpressureException.createDefault());
304304
return;
305305
}
306306
}

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableSwitchMap.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -381,7 +381,7 @@ public void onNext(R t) {
381381
SwitchMapSubscriber<T, R> p = parent;
382382
if (index == p.unique) {
383383
if (fusionMode == QueueSubscription.NONE && !queue.offer(t)) {
384-
onError(new MissingBackpressureException("Queue full?!"));
384+
onError(new QueueOverflowException());
385385
return;
386386
}
387387
p.drain();

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableThrottleFirstTimed.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -96,7 +96,7 @@ public void onNext(T t) {
9696
} else {
9797
done = true;
9898
cancel();
99-
downstream.onError(new MissingBackpressureException("Could not deliver value due to lack of requests"));
99+
downstream.onError(MissingBackpressureException.createDefault());
100100
return;
101101
}
102102

src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableTimer.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ public void run() {
7878
downstream.onComplete();
7979
} else {
8080
lazySet(EmptyDisposable.INSTANCE);
81-
downstream.onError(new MissingBackpressureException("Can't deliver value due to lack of requests"));
81+
downstream.onError(MissingBackpressureException.createDefault());
8282
}
8383
}
8484
}

0 commit comments

Comments
 (0)