Skip to content

Commit acb638f

Browse files
committed
WriteResultPublisher propagates cancel upstream
Closes gh-26642
1 parent c7e8989 commit acb638f

File tree

4 files changed

+89
-32
lines changed

4 files changed

+89
-32
lines changed

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteFlushProcessor.java

Lines changed: 29 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,9 @@ public abstract class AbstractListenerWriteFlushProcessor<T> implements Processo
6060

6161
private volatile boolean sourceCompleted;
6262

63+
@Nullable
64+
private volatile AbstractListenerWriteProcessor<?> currentWriteProcessor;
65+
6366
private final WriteResultPublisher resultPublisher;
6467

6568
private final String logPrefix;
@@ -75,7 +78,21 @@ public AbstractListenerWriteFlushProcessor() {
7578
*/
7679
public AbstractListenerWriteFlushProcessor(String logPrefix) {
7780
this.logPrefix = logPrefix;
78-
this.resultPublisher = new WriteResultPublisher(logPrefix + "[WFP] ");
81+
this.resultPublisher = new WriteResultPublisher(logPrefix + "[WFP] ",
82+
() -> {
83+
cancel();
84+
// Complete immediately
85+
State oldState = this.state.getAndSet(State.COMPLETED);
86+
if (rsWriteFlushLogger.isTraceEnabled()) {
87+
rsWriteFlushLogger.trace(getLogPrefix() + oldState + " -> " + this.state);
88+
}
89+
// Propagate to current "write" Processor
90+
AbstractListenerWriteProcessor<?> writeProcessor = this.currentWriteProcessor;
91+
if (writeProcessor != null) {
92+
writeProcessor.cancelAndSetCompleted();
93+
}
94+
this.currentWriteProcessor = null;
95+
});
7996
}
8097

8198

@@ -139,8 +156,11 @@ protected final void onFlushPossible() {
139156
}
140157

141158
/**
142-
* Invoked during an error or completion callback from the underlying
143-
* container to cancel the upstream subscription.
159+
* Cancel the upstream chain of "write" Publishers only, for example due to
160+
* Servlet container error/completion notifications. This should usually
161+
* be followed up with a call to either {@link #onError(Throwable)} or
162+
* {@link #onComplete()} to notify the downstream chain, that is unless
163+
* cancellation came from downstream.
144164
*/
145165
protected void cancel() {
146166
if (rsWriteFlushLogger.isTraceEnabled()) {
@@ -268,9 +288,10 @@ public <T> void onNext(AbstractListenerWriteFlushProcessor<T> processor,
268288
Publisher<? extends T> currentPublisher) {
269289

270290
if (processor.changeState(this, RECEIVED)) {
271-
Processor<? super T, Void> currentProcessor = processor.createWriteProcessor();
272-
currentPublisher.subscribe(currentProcessor);
273-
currentProcessor.subscribe(new WriteResultSubscriber(processor));
291+
Processor<? super T, Void> writeProcessor = processor.createWriteProcessor();
292+
processor.currentWriteProcessor = (AbstractListenerWriteProcessor<?>) writeProcessor;
293+
currentPublisher.subscribe(writeProcessor);
294+
writeProcessor.subscribe(new WriteResultSubscriber(processor));
274295
}
275296
}
276297
@Override
@@ -429,6 +450,7 @@ public void onError(Throwable ex) {
429450
rsWriteFlushLogger.trace(
430451
this.processor.getLogPrefix() + "current \"write\" Publisher failed: " + ex);
431452
}
453+
this.processor.currentWriteProcessor = null;
432454
this.processor.cancel();
433455
this.processor.onError(ex);
434456
}
@@ -439,6 +461,7 @@ public void onComplete() {
439461
rsWriteFlushLogger.trace(
440462
this.processor.getLogPrefix() + "current \"write\" Publisher completed");
441463
}
464+
this.processor.currentWriteProcessor = null;
442465
this.processor.state.get().writeComplete(this.processor);
443466
}
444467

spring-web/src/main/java/org/springframework/http/server/reactive/AbstractListenerWriteProcessor.java

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,10 @@ public AbstractListenerWriteProcessor() {
8888
* @since 5.1
8989
*/
9090
public AbstractListenerWriteProcessor(String logPrefix) {
91+
// AbstractListenerFlushProcessor calls cancelAndSetCompleted directly, so this cancel task
92+
// won't be used for HTTP responses, but it can be for a WebSocket session.
93+
this.resultPublisher = new WriteResultPublisher(logPrefix + "[WP] ", this::cancelAndSetCompleted);
9194
this.logPrefix = (StringUtils.hasText(logPrefix) ? logPrefix : "");
92-
this.resultPublisher = new WriteResultPublisher(logPrefix + "[WP] ");
9395
}
9496

9597

@@ -156,8 +158,11 @@ public final void onWritePossible() {
156158
}
157159

158160
/**
159-
* Invoked during an error or completion callback from the underlying
160-
* container to cancel the upstream subscription.
161+
* Cancel the upstream "write" Publisher only, for example due to
162+
* Servlet container error/completion notifications. This should usually
163+
* be followed up with a call to either {@link #onError(Throwable)} or
164+
* {@link #onComplete()} to notify the downstream chain, that is unless
165+
* cancellation came from downstream.
161166
*/
162167
public void cancel() {
163168
if (rsWriteLogger.isTraceEnabled()) {
@@ -168,13 +173,34 @@ public void cancel() {
168173
}
169174
}
170175

176+
/**
177+
* Cancel the "write" Publisher and transition to COMPLETED immediately also
178+
* without notifying the downstream. For use when cancellation came from
179+
* downstream.
180+
*/
181+
void cancelAndSetCompleted() {
182+
cancel();
183+
for (;;) {
184+
State prev = this.state.get();
185+
if (prev.equals(State.COMPLETED)) {
186+
break;
187+
}
188+
if (this.state.compareAndSet(prev, State.COMPLETED)) {
189+
if (rsWriteLogger.isTraceEnabled()) {
190+
rsWriteLogger.trace(getLogPrefix() + prev + " -> " + this.state);
191+
}
192+
if (!prev.equals(State.WRITING)) {
193+
discardCurrentData();
194+
}
195+
break;
196+
}
197+
}
198+
}
199+
171200
// Publisher implementation for result notifications...
172201

173202
@Override
174203
public final void subscribe(Subscriber<? super Void> subscriber) {
175-
// Technically, cancellation from the result subscriber should be propagated
176-
// to the upstream subscription. In practice, HttpHandler server adapters
177-
// don't have a reason to cancel the result subscription.
178204
this.resultPublisher.subscribe(subscriber);
179205
}
180206

spring-web/src/main/java/org/springframework/http/server/reactive/ServletServerHttpResponse.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -244,33 +244,35 @@ public void onError(AsyncEvent event) {
244244
}
245245

246246
public void handleError(Throwable ex) {
247+
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
247248
ResponseBodyProcessor processor = bodyProcessor;
248-
if (processor != null) {
249-
processor.cancel();
250-
processor.onError(ex);
251-
}
252-
else {
253-
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
254-
if (flushProcessor != null) {
255-
flushProcessor.cancel();
256-
flushProcessor.onError(ex);
249+
if (flushProcessor != null) {
250+
// Cancel the upstream source of "write" Publishers
251+
flushProcessor.cancel();
252+
// Cancel the current "write" Publisher and propagate onComplete downstream
253+
if (processor != null) {
254+
processor.cancel();
255+
processor.onError(ex);
257256
}
257+
// This is a no-op if processor was connected and onError propagated all the way
258+
flushProcessor.onError(ex);
258259
}
259260
}
260261

261262
@Override
262263
public void onComplete(AsyncEvent event) {
264+
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
263265
ResponseBodyProcessor processor = bodyProcessor;
264-
if (processor != null) {
265-
processor.cancel();
266-
processor.onComplete();
267-
}
268-
else {
269-
ResponseBodyFlushProcessor flushProcessor = bodyFlushProcessor;
270-
if (flushProcessor != null) {
271-
flushProcessor.cancel();
272-
flushProcessor.onComplete();
266+
if (flushProcessor != null) {
267+
// Cancel the upstream source of "write" Publishers
268+
flushProcessor.cancel();
269+
// Cancel the current "write" Publisher and propagate onComplete downstream
270+
if (processor != null) {
271+
processor.cancel();
272+
processor.onComplete();
273273
}
274+
// This is a no-op if processor was connected and onComplete propagated all the way
275+
flushProcessor.onComplete();
274276
}
275277
}
276278
}

spring-web/src/main/java/org/springframework/http/server/reactive/WriteResultPublisher.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ class WriteResultPublisher implements Publisher<Void> {
5050

5151
private final AtomicReference<State> state = new AtomicReference<>(State.UNSUBSCRIBED);
5252

53+
private final Runnable cancelTask;
54+
5355
@Nullable
5456
private volatile Subscriber<? super Void> subscriber;
5557

@@ -61,7 +63,8 @@ class WriteResultPublisher implements Publisher<Void> {
6163
private final String logPrefix;
6264

6365

64-
public WriteResultPublisher(String logPrefix) {
66+
public WriteResultPublisher(String logPrefix, Runnable cancelTask) {
67+
this.cancelTask = cancelTask;
6568
this.logPrefix = logPrefix;
6669
}
6770

@@ -248,7 +251,10 @@ void request(WriteResultPublisher publisher, long n) {
248251
}
249252

250253
void cancel(WriteResultPublisher publisher) {
251-
if (!publisher.changeState(this, COMPLETED)) {
254+
if (publisher.changeState(this, COMPLETED)) {
255+
publisher.cancelTask.run();
256+
}
257+
else {
252258
publisher.state.get().cancel(publisher);
253259
}
254260
}

0 commit comments

Comments
 (0)