Skip to content

Commit 35fcbae

Browse files
committed
Fix reactive HTTP server Observation instrumentation
Prior to this commit, regressions were introduced with gh-31417: 1. the observation keyvalues would be inconsistent with the HTTP response 2. the observation scope would not cover all controller handlers, causing traceIds to be missing The first issue is caused by the fact that in case of error signals, the observation was stopped before the response was fully committed, which means further processing could happen and update the response status. This commit delays the stop event until the response is committed in case of errors. The second problem is caused by the change from a `contextWrite` operator to using the `tap` operator with a `SignalListener`. The observation was started in the `doOnSubscription` callback, which is too late in some cases. If the WebFlux controller handler is synchronous non-blocking, the execution of the handler is performed before the subscription happens. This means that for those handlers, the observation was not started, even if the current observation was present in the reactor context. This commit changes the `doOnSubscription` to `doFirst` to ensure that the observation is started at the right time. Fixes gh-31703 Fixes gh-31706
1 parent c8e6315 commit 35fcbae

File tree

4 files changed

+71
-33
lines changed

4 files changed

+71
-33
lines changed

spring-web/src/main/java/org/springframework/web/filter/reactive/ServerHttpObservationFilter.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -121,16 +121,17 @@ private final class ObservationSignalListener extends DefaultSignalListener<Void
121121
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
122122
}
123123

124-
@Override
125-
public void doOnSubscription() throws Throwable {
126-
this.observation.start();
127-
}
128124

129125
@Override
130126
public Context addToContext(Context originalContext) {
131127
return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation);
132128
}
133129

130+
@Override
131+
public void doFirst() throws Throwable {
132+
this.observation.start();
133+
}
134+
134135
@Override
135136
public void doOnCancel() throws Throwable {
136137
if (this.observationRecorded.compareAndSet(false, true)) {
@@ -142,16 +143,7 @@ public void doOnCancel() throws Throwable {
142143
@Override
143144
public void doOnComplete() throws Throwable {
144145
if (this.observationRecorded.compareAndSet(false, true)) {
145-
ServerHttpResponse response = this.observationContext.getResponse();
146-
if (response.isCommitted()) {
147-
this.observation.stop();
148-
}
149-
else {
150-
response.beforeCommit(() -> {
151-
this.observation.stop();
152-
return Mono.empty();
153-
});
154-
}
146+
doOnTerminate(this.observationContext);
155147
}
156148
}
157149

@@ -162,8 +154,21 @@ public void doOnError(Throwable error) throws Throwable {
162154
this.observationContext.setConnectionAborted(true);
163155
}
164156
this.observationContext.setError(error);
157+
doOnTerminate(this.observationContext);
158+
}
159+
}
160+
161+
private void doOnTerminate(ServerRequestObservationContext context) {
162+
ServerHttpResponse response = context.getResponse();
163+
if (response.isCommitted()) {
165164
this.observation.stop();
166165
}
166+
else {
167+
response.beforeCommit(() -> {
168+
this.observation.stop();
169+
return Mono.empty();
170+
});
171+
}
167172
}
168173
}
169174

spring-web/src/main/java/org/springframework/web/server/adapter/HttpWebHandlerAdapter.java

Lines changed: 19 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -374,13 +374,13 @@ private final class ObservationSignalListener extends DefaultSignalListener<Void
374374
}
375375

376376
@Override
377-
public void doOnSubscription() throws Throwable {
378-
this.observation.start();
377+
public Context addToContext(Context originalContext) {
378+
return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation);
379379
}
380380

381381
@Override
382-
public Context addToContext(Context originalContext) {
383-
return originalContext.put(ObservationThreadLocalAccessor.KEY, this.observation);
382+
public void doFirst() throws Throwable {
383+
this.observation.start();
384384
}
385385

386386
@Override
@@ -394,30 +394,35 @@ public void doOnCancel() throws Throwable {
394394
@Override
395395
public void doOnComplete() throws Throwable {
396396
if (this.observationRecorded.compareAndSet(false, true)) {
397-
ServerHttpResponse response = this.observationContext.getResponse();
398397
Throwable throwable = (Throwable) this.observationContext.getAttributes()
399398
.get(ExceptionHandlingWebHandler.HANDLED_WEB_EXCEPTION);
400399
if (throwable != null) {
401400
this.observation.error(throwable);
402401
}
403-
if (response.isCommitted()) {
404-
this.observation.stop();
405-
}
406-
else {
407-
response.beforeCommit(() -> {
408-
this.observation.stop();
409-
return Mono.empty();
410-
});
411-
}
402+
doOnTerminate(this.observationContext);
412403
}
413404
}
414405

415406
@Override
416407
public void doOnError(Throwable error) throws Throwable {
417408
if (this.observationRecorded.compareAndSet(false, true)) {
418409
this.observationContext.setError(error);
410+
doOnTerminate(this.observationContext);
411+
}
412+
}
413+
414+
415+
private void doOnTerminate(ServerRequestObservationContext context) {
416+
ServerHttpResponse response = context.getResponse();
417+
if (response.isCommitted()) {
419418
this.observation.stop();
420419
}
420+
else {
421+
response.beforeCommit(() -> {
422+
this.observation.stop();
423+
return Mono.empty();
424+
});
425+
}
421426
}
422427
}
423428

spring-web/src/test/java/org/springframework/web/filter/reactive/ServerHttpObservationFilterTests.java

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919

2020
import java.util.Optional;
2121

22+
import io.micrometer.observation.Observation;
2223
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
2324
import io.micrometer.observation.tck.TestObservationRegistry;
2425
import io.micrometer.observation.tck.TestObservationRegistryAssert;
@@ -27,6 +28,7 @@
2728
import reactor.core.publisher.Mono;
2829
import reactor.test.StepVerifier;
2930

31+
import org.springframework.http.server.reactive.ServerHttpResponse;
3032
import org.springframework.http.server.reactive.observation.ServerRequestObservationContext;
3133
import org.springframework.web.server.ServerWebExchange;
3234
import org.springframework.web.server.WebFilterChain;
@@ -66,7 +68,10 @@ void filterShouldAddNewObservationToReactorContext() {
6668
ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource"));
6769
exchange.getResponse().setRawStatusCode(200);
6870
WebFilterChain filterChain = webExchange -> Mono.deferContextual(contextView -> {
69-
assertThat(contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent();
71+
Observation observation = contextView.get(ObservationThreadLocalAccessor.KEY);
72+
assertThat(observation).isNotNull();
73+
// check that the observation was started
74+
assertThat(observation.getContext().getLowCardinalityKeyValue("outcome")).isNotNull();
7075
return Mono.empty();
7176
});
7277
this.filter.filter(exchange, filterChain).block();
@@ -100,6 +105,25 @@ void filterShouldRecordObservationWhenCancelled() {
100105
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "UNKNOWN");
101106
}
102107

108+
@Test
109+
void filterShouldStopObservationOnResponseCommit() {
110+
ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource"));
111+
WebFilterChain filterChain = createFilterChain(filterExchange -> {
112+
throw new IllegalArgumentException("server error");
113+
});
114+
StepVerifier.create(this.filter.filter(exchange, filterChain).doOnError(throwable -> {
115+
ServerHttpResponse response = exchange.getResponse();
116+
response.setRawStatusCode(500);
117+
response.setComplete().block();
118+
}))
119+
.expectError(IllegalArgumentException.class)
120+
.verify();
121+
Optional<ServerRequestObservationContext> observationContext = ServerHttpObservationFilter.findObservationContext(exchange);
122+
assertThat(observationContext.get().getError()).isInstanceOf(IllegalArgumentException.class);
123+
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SERVER_ERROR");
124+
}
125+
126+
103127
private WebFilterChain createFilterChain(ThrowingConsumer<ServerWebExchange> exchangeConsumer) {
104128
return filterExchange -> {
105129
try {

spring-web/src/test/java/org/springframework/web/server/adapter/HttpWebHandlerAdapterObservabilityTests.java

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,13 +20,13 @@
2020
import java.util.List;
2121
import java.util.Optional;
2222

23+
import io.micrometer.observation.Observation;
2324
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
2425
import io.micrometer.observation.tck.TestObservationRegistry;
2526
import io.micrometer.observation.tck.TestObservationRegistryAssert;
2627
import org.junit.jupiter.api.Test;
2728
import reactor.core.publisher.Mono;
2829
import reactor.test.StepVerifier;
29-
import reactor.util.context.ContextView;
3030

3131
import org.springframework.http.HttpStatus;
3232
import org.springframework.http.server.reactive.observation.ServerRequestObservationContext;
@@ -66,7 +66,8 @@ void handlerShouldSetObservationContextOnExchange() {
6666
void handlerShouldSetCurrentObservationInReactorContext() {
6767
ReactorContextWebHandler targetHandler = new ReactorContextWebHandler();
6868
createWebHandler(targetHandler).handle(this.request, this.response).block();
69-
assertThat(targetHandler.contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent();
69+
assertThat(targetHandler.currentObservation).isNotNull();
70+
assertThat(targetHandler.observationStarted).isTrue();
7071
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS");
7172
}
7273

@@ -120,13 +121,16 @@ public Mono<Void> handle(ServerWebExchange exchange) {
120121

121122
private static class ReactorContextWebHandler implements WebHandler {
122123

123-
ContextView contextView;
124+
Observation currentObservation;
125+
126+
boolean observationStarted;
124127

125128
@Override
126129
public Mono<Void> handle(ServerWebExchange exchange) {
127130
exchange.getResponse().setStatusCode(HttpStatus.OK);
128131
return Mono.deferContextual(contextView -> {
129-
this.contextView = contextView;
132+
this.currentObservation = contextView.get(ObservationThreadLocalAccessor.KEY);
133+
this.observationStarted = this.currentObservation.getContext().getLowCardinalityKeyValue("outcome") != null;
130134
return Mono.empty();
131135
});
132136
}

0 commit comments

Comments
 (0)