diff --git a/src/main/java/io/reactivex/rxjava3/core/FlowableSubscriber.java b/src/main/java/io/reactivex/rxjava3/core/FlowableSubscriber.java index 38b5bf3459..e07662b16b 100644 --- a/src/main/java/io/reactivex/rxjava3/core/FlowableSubscriber.java +++ b/src/main/java/io/reactivex/rxjava3/core/FlowableSubscriber.java @@ -18,8 +18,10 @@ import io.reactivex.rxjava3.annotations.NonNull; /** - * Represents a Reactive-Streams inspired {@link Subscriber} that is RxJava 2 only - * and weakens rules §1.3 and §3.9 of the specification for gaining performance. + * Represents a Reactive-Streams inspired {@link Subscriber} that is RxJava 3 only + * and weakens the Reactive Streams rules §1.3 + * and §3.9 of the specification + * for gaining performance. * *

History: 2.0.7 - experimental; 2.1 - beta * @param the value type diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableUsing.java b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableUsing.java index 5a5b177d35..49fe0b8fb4 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableUsing.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/flowable/FlowableUsing.java @@ -117,7 +117,6 @@ public void onError(Throwable t) { } } - upstream.cancel(); if (innerError != null) { downstream.onError(new CompositeException(t, innerError)); } else { @@ -125,7 +124,6 @@ public void onError(Throwable t) { } } else { downstream.onError(t); - upstream.cancel(); disposeResource(); } } @@ -143,11 +141,9 @@ public void onComplete() { } } - upstream.cancel(); downstream.onComplete(); } else { downstream.onComplete(); - upstream.cancel(); disposeResource(); } } diff --git a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableUsing.java b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableUsing.java index dec3da5ebe..90954d41d8 100644 --- a/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableUsing.java +++ b/src/main/java/io/reactivex/rxjava3/internal/operators/observable/ObservableUsing.java @@ -115,11 +115,9 @@ public void onError(Throwable t) { } } - upstream.dispose(); downstream.onError(t); } else { downstream.onError(t); - upstream.dispose(); disposeResource(); } } @@ -137,11 +135,9 @@ public void onComplete() { } } - upstream.dispose(); downstream.onComplete(); } else { downstream.onComplete(); - upstream.dispose(); disposeResource(); } }