Skip to content

WebClient and WebFlux Observation do not propagate context #29388

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ public class ServerHttpObservationFilter implements WebFilter {
private static final Set<String> DISCONNECTED_CLIENT_EXCEPTIONS = Set.of("AbortedException",
"ClientAbortException", "EOFException", "EofException");

/**
* Aligned with ObservationThreadLocalAccessor#KEY from micrometer-core.
*/
private static final String MICROMETER_OBSERVATION_KEY = "micrometer.observation";

private final ObservationRegistry observationRegistry;

private final ServerRequestObservationConvention observationConvention;
Expand Down Expand Up @@ -117,7 +122,8 @@ private Publisher<Void> filter(ServerWebExchange exchange, ServerRequestObservat
.doOnCancel(() -> {
observationContext.setConnectionAborted(true);
observation.stop();
});
})
.contextWrite(context -> context.put(MICROMETER_OBSERVATION_KEY, observation));
}

private void onTerminalSignal(Observation observation, ServerWebExchange exchange) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import java.util.Optional;

import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.observation.tck.TestObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistryAssert;
import org.assertj.core.api.ThrowingConsumer;
Expand Down Expand Up @@ -59,6 +60,18 @@ void filterShouldFillObservationContext() {
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS");
}

@Test
void filterShouldAddNewObservationToReactorContext() {
ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource"));
exchange.getResponse().setRawStatusCode(200);
WebFilterChain filterChain = webExchange -> Mono.deferContextual(contextView -> {
assertThat(contextView.getOrEmpty(ObservationThreadLocalAccessor.KEY)).isPresent();
return Mono.empty();
});
this.filter.filter(exchange, filterChain).block();
assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS");
}

@Test
void filterShouldUseThrownException() {
ServerWebExchange exchange = MockServerWebExchange.from(MockServerHttpRequest.post("/test/resource"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,24 @@
* @author Brian Clozel
* @since 6.0
*/
public class ClientRequestObservationContext extends RequestReplySenderContext<ClientRequest, ClientResponse> {
public class ClientRequestObservationContext extends RequestReplySenderContext<ClientRequest.Builder, ClientResponse> {

@Nullable
private String uriTemplate;

private boolean aborted;

@Nullable
private ClientRequest builtRequest;


public ClientRequestObservationContext() {
super(ClientRequestObservationContext::setRequestHeader);
}

private static void setRequestHeader(@Nullable ClientRequest request, String name, String value) {
private static void setRequestHeader(@Nullable ClientRequest.Builder request, String name, String value) {
if (request != null) {
request.headers().set(name, value);
request.header(name, value);
}
}

Expand Down Expand Up @@ -75,4 +78,18 @@ public boolean isAborted() {
void setAborted(boolean aborted) {
this.aborted = aborted;
}

/**
* Return the built request.
*/
public ClientRequest getBuiltRequest() {
return this.builtRequest;
}

/**
* Set the built request.
*/
public void setBuiltRequest(ClientRequest builtRequest) {
this.builtRequest = builtRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ public String getName() {

@Override
public String getContextualName(ClientRequestObservationContext context) {
return "http " + context.getCarrier().method().name().toLowerCase();
return "http " + context.getBuiltRequest().method().name().toLowerCase();
}

@Override
Expand All @@ -95,8 +95,8 @@ protected KeyValue uri(ClientRequestObservationContext context) {
}

protected KeyValue method(ClientRequestObservationContext context) {
if (context.getCarrier() != null) {
return KeyValue.of(ClientHttpObservationDocumentation.LowCardinalityKeyNames.METHOD, context.getCarrier().method().name());
if (context.getBuiltRequest() != null) {
return KeyValue.of(ClientHttpObservationDocumentation.LowCardinalityKeyNames.METHOD, context.getBuiltRequest().method().name());
}
else {
return METHOD_NONE;
Expand Down Expand Up @@ -143,15 +143,15 @@ public KeyValues getHighCardinalityKeyValues(ClientRequestObservationContext con
}

protected KeyValue httpUrl(ClientRequestObservationContext context) {
if (context.getCarrier() != null) {
return KeyValue.of(ClientHttpObservationDocumentation.HighCardinalityKeyNames.HTTP_URL, context.getCarrier().url().toASCIIString());
if (context.getBuiltRequest() != null) {
return KeyValue.of(ClientHttpObservationDocumentation.HighCardinalityKeyNames.HTTP_URL, context.getBuiltRequest().url().toASCIIString());
}
return HTTP_URL_NONE;
}

protected KeyValue clientName(ClientRequestObservationContext context) {
if (context.getCarrier() != null && context.getCarrier().url().getHost() != null) {
return KeyValue.of(ClientHttpObservationDocumentation.HighCardinalityKeyNames.CLIENT_NAME, context.getCarrier().url().getHost());
if (context.getBuiltRequest() != null && context.getBuiltRequest().url().getHost() != null) {
return KeyValue.of(ClientHttpObservationDocumentation.HighCardinalityKeyNames.CLIENT_NAME, context.getBuiltRequest().url().getHost());
}
return CLIENT_NAME_NONE;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,11 @@ class DefaultWebClient implements WebClient {

private static final DefaultClientRequestObservationConvention DEFAULT_OBSERVATION_CONVENTION = new DefaultClientRequestObservationConvention();

/**
* Aligned with ObservationThreadLocalAccessor#KEY from micrometer-core.
*/
private static final String MICROMETER_OBSERVATION = "micrometer.observation";

private final ExchangeFunction exchangeFunction;

private final UriBuilderFactory uriBuilderFactory;
Expand Down Expand Up @@ -450,14 +455,19 @@ public <V> Flux<V> exchangeToFlux(Function<ClientResponse, ? extends Flux<V>> re
@SuppressWarnings("deprecation")
public Mono<ClientResponse> exchange() {
ClientRequestObservationContext observationContext = new ClientRequestObservationContext();
ClientRequest request = (this.inserter != null ?
initRequestBuilder().body(this.inserter).build() :
initRequestBuilder().build());
return Mono.defer(() -> {
ClientRequest.Builder requestBuilder = this.inserter != null ?
initRequestBuilder().body(this.inserter) :
initRequestBuilder();
return Mono.deferContextual(contextView -> {
Observation observation = ClientHttpObservationDocumentation.HTTP_REQUEST.observation(observationConvention,
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry).start();
observationContext.setCarrier(request);
DEFAULT_OBSERVATION_CONVENTION, () -> observationContext, observationRegistry);
observationContext.setCarrier(requestBuilder);
observation
.parentObservation(contextView.getOrDefault(MICROMETER_OBSERVATION, null))
.start();
ClientRequest request = requestBuilder.build();
observationContext.setUriTemplate((String) request.attribute(URI_TEMPLATE_ATTRIBUTE).orElse(null));
observationContext.setBuiltRequest(request);
Mono<ClientResponse> responseMono = exchangeFunction.exchange(request)
.checkpoint("Request to " + this.httpMethod.name() + " " + this.uri + " [DefaultWebClient]")
.switchIfEmpty(NO_HTTP_CLIENT_RESPONSE_ERROR);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,8 @@ void shouldHaveName() {
@Test
void shouldHaveContextualName() {
ClientRequestObservationContext context = new ClientRequestObservationContext();
context.setCarrier(ClientRequest.create(HttpMethod.GET, URI.create("/test")).build());
context.setCarrier(ClientRequest.create(HttpMethod.GET, URI.create("/test")));
context.setBuiltRequest(context.getCarrier().build());
assertThat(this.observationConvention.getContextualName(context)).isEqualTo("http get");
}

Expand Down Expand Up @@ -77,10 +78,11 @@ void shouldAddKeyValuesForExchangeWithException() {

@Test
void shouldAddKeyValuesForRequestWithUriTemplate() {
ClientRequest request = ClientRequest.create(HttpMethod.GET, URI.create("/resource/42"))
.attribute(WebClient.class.getName() + ".uriTemplate", "/resource/{id}").build();
ClientRequest.Builder request = ClientRequest.create(HttpMethod.GET, URI.create("/resource/42"))
.attribute(WebClient.class.getName() + ".uriTemplate", "/resource/{id}");
ClientRequestObservationContext context = createContext(request);
context.setUriTemplate("/resource/{id}");
context.setBuiltRequest(context.getCarrier().build());
assertThat(this.observationConvention.getLowCardinalityKeyValues(context))
.contains(KeyValue.of("exception", "none"), KeyValue.of("method", "GET"), KeyValue.of("uri", "/resource/{id}"),
KeyValue.of("status", "200"), KeyValue.of("outcome", "SUCCESS"));
Expand All @@ -90,19 +92,21 @@ void shouldAddKeyValuesForRequestWithUriTemplate() {

@Test
void shouldAddKeyValuesForRequestWithoutUriTemplate() {
ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("/resource/42")).build());
ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("/resource/42")));
context.setBuiltRequest(context.getCarrier().build());
assertThat(this.observationConvention.getLowCardinalityKeyValues(context))
.contains(KeyValue.of("method", "GET"), KeyValue.of("uri", "none"));
assertThat(this.observationConvention.getHighCardinalityKeyValues(context)).hasSize(2).contains(KeyValue.of("http.url", "/resource/42"));
}

@Test
void shouldAddClientNameKeyValueForRequestWithHost() {
ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("https://localhost:8080/resource/42")).build());
ClientRequestObservationContext context = createContext(ClientRequest.create(HttpMethod.GET, URI.create("https://localhost:8080/resource/42")));
context.setBuiltRequest(context.getCarrier().build());
assertThat(this.observationConvention.getHighCardinalityKeyValues(context)).contains(KeyValue.of("client.name", "localhost"));
}

private ClientRequestObservationContext createContext(ClientRequest request) {
private ClientRequestObservationContext createContext(ClientRequest.Builder request) {
ClientRequestObservationContext context = new ClientRequestObservationContext();
context.setCarrier(request);
context.setResponse(ClientResponse.create(HttpStatus.OK).build());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@
package org.springframework.web.reactive.function.client;

import java.time.Duration;
import java.util.Collections;

import io.micrometer.observation.Observation;
import io.micrometer.observation.ObservationHandler;
import io.micrometer.observation.contextpropagation.ObservationThreadLocalAccessor;
import io.micrometer.observation.tck.TestObservationRegistry;
import io.micrometer.observation.tck.TestObservationRegistryAssert;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -28,6 +32,7 @@

import org.springframework.http.HttpStatus;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.BDDMockito.given;
import static org.mockito.BDDMockito.when;
Expand Down Expand Up @@ -57,17 +62,35 @@ void setup() {
when(mockResponse.bodyToMono(Void.class)).thenReturn(Mono.empty());
given(this.exchangeFunction.exchange(this.request.capture())).willReturn(Mono.just(mockResponse));
this.builder = WebClient.builder().baseUrl("/base").exchangeFunction(this.exchangeFunction).observationRegistry(this.observationRegistry);
this.observationRegistry.observationConfig().observationHandler(new HeaderInjectingHandler());
}


@Test
void recordsObservationForSuccessfulExchange() {
this.builder.build().get().uri("/resource/{id}", 42)
.retrieve().bodyToMono(Void.class).block(Duration.ofSeconds(10));
verifyAndGetRequest();

ClientRequest clientRequest = verifyAndGetRequest();

assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS")
.hasLowCardinalityKeyValue("uri", "/resource/{id}");
assertThat(clientRequest.headers()).containsEntry("foo", Collections.singletonList("bar"));
}

@Test
void recordsObservationForSuccessfulExchangeWithParentObservationInReactorContext() {
Observation parent = Observation.start("parent", observationRegistry);
try {
this.builder.build().get().uri("/resource/{id}", 42)
.retrieve().bodyToMono(Void.class).contextWrite(context -> context.put(ObservationThreadLocalAccessor.KEY, parent)).block(Duration.ofSeconds(10));
verifyAndGetRequest();

assertThatHttpObservation().hasLowCardinalityKeyValue("outcome", "SUCCESS")
.hasParentObservationEqualTo(parent);
}
finally {
parent.stop();
}
}

@Test
Expand Down Expand Up @@ -102,4 +125,17 @@ private ClientRequest verifyAndGetRequest() {
return request.getValue();
}

static class HeaderInjectingHandler implements ObservationHandler<ClientRequestObservationContext> {

@Override
public void onStart(ClientRequestObservationContext context) {
context.getSetter().set(context.getCarrier(), "foo", "bar");
}

@Override
public boolean supportsContext(Observation.Context context) {
return context instanceof ClientRequestObservationContext;
}
}

}