Skip to content

Commit f258159

Browse files
committed
Trace websocket for spring webflux reactive handlers
1 parent b4aebf8 commit f258159

File tree

10 files changed

+404
-40
lines changed

10 files changed

+404
-40
lines changed

dd-java-agent/instrumentation/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/NettyChannelPipelineInstrumentation.java

Lines changed: 29 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@
2323
import datadog.trace.instrumentation.netty41.server.HttpServerResponseTracingHandler;
2424
import datadog.trace.instrumentation.netty41.server.HttpServerTracingHandler;
2525
import datadog.trace.instrumentation.netty41.server.MaybeBlockResponseHandler;
26-
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketServerRequestTracingHandler;
27-
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketServerResponseTracingHandler;
26+
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketServerInboundTracingHandler;
27+
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketServerOutboundTracingHandler;
2828
import datadog.trace.instrumentation.netty41.server.websocket.WebSocketServerTracingHandler;
2929
import io.netty.channel.ChannelHandler;
3030
import io.netty.channel.ChannelPipeline;
@@ -34,6 +34,8 @@
3434
import io.netty.handler.codec.http.HttpResponseDecoder;
3535
import io.netty.handler.codec.http.HttpResponseEncoder;
3636
import io.netty.handler.codec.http.HttpServerCodec;
37+
import io.netty.handler.codec.http.websocketx.WebSocketFrameDecoder;
38+
import io.netty.handler.codec.http.websocketx.WebSocketFrameEncoder;
3739
import io.netty.handler.codec.http.websocketx.WebSocketServerProtocolHandler;
3840
import io.netty.util.Attribute;
3941
import net.bytebuddy.asm.Advice;
@@ -82,8 +84,8 @@ public String[] helperClassNames() {
8284
packageName + ".server.HttpServerTracingHandler",
8385
packageName + ".server.MaybeBlockResponseHandler",
8486
packageName + ".server.websocket.WebSocketServerTracingHandler",
85-
packageName + ".server.websocket.WebSocketServerResponseTracingHandler",
86-
packageName + ".server.websocket.WebSocketServerRequestTracingHandler",
87+
packageName + ".server.websocket.WebSocketServerOutboundTracingHandler",
88+
packageName + ".server.websocket.WebSocketServerInboundTracingHandler",
8789
packageName + ".NettyHttp2Helper",
8890
packageName + ".NettyPipelineHelper",
8991
};
@@ -162,23 +164,31 @@ public static void addHandler(
162164
HttpServerResponseTracingHandler.INSTANCE,
163165
MaybeBlockResponseHandler.INSTANCE);
164166
} else if (handler instanceof WebSocketServerProtocolHandler) {
165-
if (InstrumenterConfig.get().isWebsocketTracingEnabled()) {
166-
if (pipeline.get(HttpServerTracingHandler.class) != null) {
167-
NettyPipelineHelper.addHandlerAfter(
168-
pipeline, "HttpServerTracingHandler#0", new WebSocketServerTracingHandler());
167+
if (InstrumenterConfig.get().isWebsocketTracingEnabled()
168+
&& pipeline.get(HttpServerTracingHandler.class) != null) {
169+
// remove single websocket handler if added before
170+
if (pipeline.get(WebSocketServerInboundTracingHandler.class) != null) {
171+
pipeline.remove(WebSocketServerInboundTracingHandler.class);
169172
}
170-
if (pipeline.get(HttpServerRequestTracingHandler.class) != null) {
171-
NettyPipelineHelper.addHandlerAfter(
172-
pipeline,
173-
"HttpServerRequestTracingHandler#0",
174-
WebSocketServerRequestTracingHandler.INSTANCE);
175-
}
176-
if (pipeline.get(HttpServerResponseTracingHandler.class) != null) {
177-
NettyPipelineHelper.addHandlerAfter(
178-
pipeline,
179-
"HttpServerResponseTracingHandler#0",
180-
WebSocketServerResponseTracingHandler.INSTANCE);
173+
if (pipeline.get(WebSocketServerOutboundTracingHandler.class) != null) {
174+
pipeline.remove(WebSocketServerOutboundTracingHandler.class);
181175
}
176+
NettyPipelineHelper.addHandlerAfter(
177+
pipeline,
178+
pipeline.get(HttpServerTracingHandler.class),
179+
new WebSocketServerTracingHandler());
180+
}
181+
} else if (handler instanceof WebSocketFrameDecoder) {
182+
if (InstrumenterConfig.get().isWebsocketTracingEnabled()
183+
&& pipeline.get(WebSocketServerTracingHandler.class) == null) {
184+
NettyPipelineHelper.addHandlerAfter(
185+
pipeline, handler, WebSocketServerInboundTracingHandler.INSTANCE);
186+
}
187+
} else if (handler instanceof WebSocketFrameEncoder) {
188+
if (InstrumenterConfig.get().isWebsocketTracingEnabled()
189+
&& pipeline.get(WebSocketServerTracingHandler.class) == null) {
190+
NettyPipelineHelper.addHandlerAfter(
191+
pipeline, handler, WebSocketServerOutboundTracingHandler.INSTANCE);
182192
}
183193
}
184194
// Client pipeline handlers
Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,13 +17,12 @@
1717
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
1818

1919
@ChannelHandler.Sharable
20-
public class WebSocketServerRequestTracingHandler extends ChannelInboundHandlerAdapter {
21-
public static WebSocketServerRequestTracingHandler INSTANCE =
22-
new WebSocketServerRequestTracingHandler();
20+
public class WebSocketServerInboundTracingHandler extends ChannelInboundHandlerAdapter {
21+
public static WebSocketServerInboundTracingHandler INSTANCE =
22+
new WebSocketServerInboundTracingHandler();
2323

2424
@Override
2525
public void channelRead(ChannelHandlerContext ctx, Object frame) {
26-
2726
if (frame instanceof WebSocketFrame) {
2827
Channel channel = ctx.channel();
2928
HandlerContext.Receiver receiverContext =
Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,9 @@
1717
import io.netty.handler.codec.http.websocketx.WebSocketFrame;
1818

1919
@ChannelHandler.Sharable
20-
public class WebSocketServerResponseTracingHandler extends ChannelOutboundHandlerAdapter {
21-
public static WebSocketServerResponseTracingHandler INSTANCE =
22-
new WebSocketServerResponseTracingHandler();
20+
public class WebSocketServerOutboundTracingHandler extends ChannelOutboundHandlerAdapter {
21+
public static WebSocketServerOutboundTracingHandler INSTANCE =
22+
new WebSocketServerOutboundTracingHandler();
2323

2424
@Override
2525
public void write(ChannelHandlerContext ctx, Object frame, ChannelPromise promise)

dd-java-agent/instrumentation/netty-4.1/src/main/java/datadog/trace/instrumentation/netty41/server/websocket/WebSocketServerTracingHandler.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,11 @@
44

55
public class WebSocketServerTracingHandler
66
extends CombinedChannelDuplexHandler<
7-
WebSocketServerRequestTracingHandler, WebSocketServerResponseTracingHandler> {
7+
WebSocketServerInboundTracingHandler, WebSocketServerOutboundTracingHandler> {
88

99
public WebSocketServerTracingHandler() {
1010
super(
11-
WebSocketServerRequestTracingHandler.INSTANCE,
12-
WebSocketServerResponseTracingHandler.INSTANCE);
11+
WebSocketServerInboundTracingHandler.INSTANCE,
12+
WebSocketServerOutboundTracingHandler.INSTANCE);
1313
}
1414
}

dd-java-agent/instrumentation/spring-webflux-5/src/bootTest/groovy/SpringWebfluxTest.groovy

Lines changed: 106 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
import datadog.trace.agent.test.AgentTestRunner
22
import datadog.trace.agent.test.asserts.TraceAssert
3+
import datadog.trace.agent.test.base.HttpServerTest
4+
import datadog.trace.agent.test.base.OkHttpWebsocketClient
5+
import datadog.trace.agent.test.base.WebsocketServer
36
import datadog.trace.api.DDSpanTypes
47
import datadog.trace.api.DDTags
58
import datadog.trace.bootstrap.instrumentation.api.Tags
@@ -9,6 +12,9 @@ import dd.trace.instrumentation.springwebflux.server.EchoHandlerFunction
912
import dd.trace.instrumentation.springwebflux.server.FooModel
1013
import dd.trace.instrumentation.springwebflux.server.SpringWebFluxTestApplication
1114
import dd.trace.instrumentation.springwebflux.server.TestController
15+
import dd.trace.instrumentation.springwebflux.server.WsHandler
16+
import net.bytebuddy.utility.RandomString
17+
import org.springframework.beans.factory.annotation.Autowired
1218
import org.springframework.boot.test.context.SpringBootTest
1319
import org.springframework.boot.test.context.TestConfiguration
1420
import org.springframework.boot.web.embedded.netty.NettyReactiveWebServerFactory
@@ -18,9 +24,22 @@ import org.springframework.http.client.reactive.ReactorClientHttpConnector
1824
import org.springframework.web.reactive.function.BodyExtractors
1925
import org.springframework.web.reactive.function.BodyInserters
2026
import org.springframework.web.reactive.function.client.WebClient
27+
import org.springframework.web.reactive.socket.WebSocketHandler
28+
import org.springframework.web.reactive.socket.WebSocketSession
29+
import org.springframework.web.reactive.socket.client.WebSocketClient
2130
import org.springframework.web.server.ResponseStatusException
2231
import reactor.core.publisher.Mono
2332

33+
import static datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint.WEBSOCKET
34+
import static datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint.WEBSOCKET
35+
import static datadog.trace.agent.test.base.HttpServerTest.ServerEndpoint.WEBSOCKET
36+
import static datadog.trace.agent.test.base.HttpServerTest.websocketCloseSpan
37+
import static datadog.trace.agent.test.base.HttpServerTest.websocketReceiveSpan
38+
import static datadog.trace.agent.test.base.HttpServerTest.websocketSendSpan
39+
import static datadog.trace.agent.test.utils.TraceUtils.basicSpan
40+
import static org.junit.Assume.assumeTrue
41+
import static org.junit.Assume.assumeTrue
42+
2443
@SpringBootTest(webEnvironment = SpringBootTest.WebEnvironment.RANDOM_PORT,
2544
classes = [SpringWebFluxTestApplication, ForceNettyAutoConfiguration],
2645
properties = "server.http2.enabled=true")
@@ -40,13 +59,22 @@ class SpringWebfluxTest extends AgentTestRunner {
4059
@LocalServerPort
4160
int port
4261

43-
WebClient client = WebClient.builder().clientConnector (new ReactorClientHttpConnector()).build()
62+
@Autowired
63+
private WsHandler wsHandler
64+
65+
WebClient client = WebClient.builder().clientConnector(new ReactorClientHttpConnector()).build()
4466

4567
@Override
4668
boolean useStrictTraceWrites() {
4769
false
4870
}
4971

72+
@Override
73+
protected void configurePreAgent() {
74+
super.configurePreAgent()
75+
injectSysConfig("trace.websocket.messages.enabled", "true")
76+
}
77+
5078
def "Basic GET test #testName"() {
5179
setup:
5280
String url = "http://localhost:$port$urlPath"
@@ -61,7 +89,7 @@ class SpringWebfluxTest extends AgentTestRunner {
6189
sortSpansByStart()
6290
trace(2) {
6391
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url))
64-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
92+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
6593
}
6694
trace(2) {
6795
span {
@@ -142,7 +170,7 @@ class SpringWebfluxTest extends AgentTestRunner {
142170
def traceParent
143171
trace(2) {
144172
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url))
145-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
173+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
146174
}
147175
trace(3) {
148176
span {
@@ -237,7 +265,7 @@ class SpringWebfluxTest extends AgentTestRunner {
237265
def traceParent
238266
trace(2) {
239267
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url))
240-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
268+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
241269
}
242270
trace(3) {
243271
span {
@@ -285,7 +313,7 @@ class SpringWebfluxTest extends AgentTestRunner {
285313
def traceParent
286314
trace(2) {
287315
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url), 404, true)
288-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), 404, true)
316+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), 404, true)
289317
}
290318
trace(2) {
291319
span {
@@ -331,7 +359,7 @@ class SpringWebfluxTest extends AgentTestRunner {
331359
String url = "http://localhost:$port/echo"
332360

333361
when:
334-
def response = client.post().uri(url).body(BodyInserters.fromPublisher(Mono.just(echoString),String)).exchange().block()
362+
def response = client.post().uri(url).body(BodyInserters.fromPublisher(Mono.just(echoString), String)).exchange().block()
335363

336364
then:
337365
response.statusCode().value() == 202
@@ -341,7 +369,7 @@ class SpringWebfluxTest extends AgentTestRunner {
341369
def traceParent
342370
trace(2) {
343371
clientSpan(it, null, "http.request", "spring-webflux-client", "POST", URI.create(url), 202)
344-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "POST", URI.create(url), 202)
372+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "POST", URI.create(url), 202)
345373
}
346374
trace(3) {
347375
span {
@@ -406,7 +434,7 @@ class SpringWebfluxTest extends AgentTestRunner {
406434
def traceParent
407435
trace(2) {
408436
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url), 500)
409-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), 500)
437+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), 500)
410438
}
411439
trace(2) {
412440
span {
@@ -495,7 +523,7 @@ class SpringWebfluxTest extends AgentTestRunner {
495523
trace(2) {
496524
sortSpansByStart()
497525
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url), 307)
498-
traceParent1 = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), 307)
526+
traceParent1 = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url), 307)
499527
}
500528

501529
trace(2) {
@@ -540,7 +568,7 @@ class SpringWebfluxTest extends AgentTestRunner {
540568
trace(2) {
541569
sortSpansByStart()
542570
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(finalUrl))
543-
traceParent2 = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(finalUrl))
571+
traceParent2 = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(finalUrl))
544572
}
545573
trace(2) {
546574
sortSpansByStart()
@@ -599,7 +627,7 @@ class SpringWebfluxTest extends AgentTestRunner {
599627
def traceParent
600628
trace(2) {
601629
clientSpan(it, null, "http.request", "spring-webflux-client", "GET", URI.create(url))
602-
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
630+
traceParent = clientSpan(it, span(0), "netty.client.request", "netty-client", "GET", URI.create(url))
603631
}
604632
trace(2) {
605633
span {
@@ -660,6 +688,73 @@ class SpringWebfluxTest extends AgentTestRunner {
660688
"annotation API delayed response" | "/foo-delayed" | "/foo-delayed" | "getFooDelayed" | new FooModel(3L, "delayed").toString()
661689
}
662690

691+
def 'test websocket server receive #msgType message of size #size and #chunks chunks'() {
692+
when:
693+
String url = "http://localhost:$port/websocket"
694+
def wsClient = new OkHttpWebsocketClient()
695+
wsClient.connect(url)
696+
wsHandler.awaitConnected()
697+
if (message instanceof String) {
698+
wsClient.send(message as String)
699+
} else {
700+
wsClient.send(message as byte[])
701+
}
702+
wsHandler.awaitExchangeComplete()
703+
wsClient.close(1001, "goodbye")
704+
705+
then:
706+
assertTraces(3, {
707+
DDSpan handshake
708+
trace(2) {
709+
sortSpansByStart()
710+
handshake = span(0)
711+
span {
712+
resourceName "GET /websocket"
713+
operationName "netty.request"
714+
spanType DDSpanTypes.HTTP_SERVER
715+
tags {
716+
"$Tags.COMPONENT" "netty"
717+
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
718+
"$Tags.PEER_HOST_IPV4" "127.0.0.1"
719+
"$Tags.PEER_PORT" Integer
720+
"$Tags.HTTP_URL" url
721+
"$Tags.HTTP_HOSTNAME" "localhost"
722+
"$Tags.HTTP_METHOD" "GET"
723+
"$Tags.HTTP_STATUS" 101
724+
"$Tags.HTTP_USER_AGENT" String
725+
"$Tags.HTTP_CLIENT_IP" "127.0.0.1"
726+
"$Tags.HTTP_ROUTE" "/websocket"
727+
defaultTags()
728+
}
729+
}
730+
span {
731+
resourceName "WsHandler.handle"
732+
operationName "WsHandler.handle"
733+
spanType DDSpanTypes.HTTP_SERVER
734+
childOfPrevious()
735+
tags {
736+
"$Tags.COMPONENT" "spring-webflux-controller"
737+
"$Tags.SPAN_KIND" Tags.SPAN_KIND_SERVER
738+
"handler.type" WsHandler.getName()
739+
defaultTags()
740+
}
741+
}
742+
}
743+
trace(2) {
744+
sortSpansByStart()
745+
websocketReceiveSpan(it, handshake, msgType, size, chunks)
746+
websocketSendSpan(it, handshake, msgType, size, chunks)
747+
}
748+
trace(1) {
749+
websocketCloseSpan(it, handshake, false, 1001, "goodbye")
750+
}
751+
})
752+
where:
753+
message | msgType | chunks | size
754+
RandomString.make(10) | "text" | 1 | 10
755+
RandomString.make(20).getBytes("UTF-8") | "binary" | 1 | 20
756+
}
757+
663758
def clientSpan(
664759
TraceAssert trace,
665760
Object parentSpan,

dd-java-agent/instrumentation/spring-webflux-5/src/bootTest/groovy/dd/trace/instrumentation/springwebflux/server/SpringWebFluxTestApplication.groovy

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5,11 +5,15 @@ import org.springframework.boot.autoconfigure.SpringBootApplication
55
import org.springframework.context.annotation.Bean
66
import org.springframework.http.MediaType
77
import org.springframework.stereotype.Component
8+
import org.springframework.web.reactive.HandlerMapping
89
import org.springframework.web.reactive.function.BodyInserters
910
import org.springframework.web.reactive.function.server.HandlerFunction
1011
import org.springframework.web.reactive.function.server.RouterFunction
1112
import org.springframework.web.reactive.function.server.ServerRequest
1213
import org.springframework.web.reactive.function.server.ServerResponse
14+
import org.springframework.web.reactive.handler.SimpleUrlHandlerMapping
15+
import org.springframework.web.reactive.socket.WebSocketHandler
16+
import org.springframework.web.reactive.socket.server.support.WebSocketHandlerAdapter
1317
import reactor.core.publisher.Mono
1418

1519
import java.time.Duration
@@ -26,6 +30,22 @@ class SpringWebFluxTestApplication {
2630
return route(POST("/echo"), new EchoHandlerFunction(echoHandler))
2731
}
2832

33+
@Bean
34+
WebSocketHandlerAdapter webSocketHandlerAdapter() {
35+
return new WebSocketHandlerAdapter()
36+
}
37+
38+
@Bean
39+
HandlerMapping wsHandlerMapping(WsHandler wsHandler) {
40+
Map<String, WebSocketHandler> map = new HashMap<>()
41+
map.put("/websocket", wsHandler)
42+
43+
SimpleUrlHandlerMapping handlerMapping = new SimpleUrlHandlerMapping()
44+
handlerMapping.setOrder(1)
45+
handlerMapping.setUrlMap(map)
46+
return handlerMapping
47+
}
48+
2949
@Bean
3050
RouterFunction<ServerResponse> greetRouterFunction(GreetingHandler greetingHandler) {
3151
return route(GET("/greet"), new HandlerFunction<ServerResponse>() {

0 commit comments

Comments
 (0)