From 33699842a0334531dc63c9332941fd03b377fbcf Mon Sep 17 00:00:00 2001 From: libourel06 <38031544+libourel06@users.noreply.github.com> Date: Mon, 24 Feb 2025 13:45:43 +0100 Subject: [PATCH] Update TracingDecorator.java FDPT-69007 fix tracingDecorator for span context, Baggages and header context Update TracingDecoratorTest.java remove automatically added io.quarkiverse.kafkastreamsprocessor.impl.decorator.processor.TracingDecoratorTest prefix to used classes Update TracingDecoratorTest.java adapt unit test to new constructor of TracingDecorator --- .../decorator/processor/TracingDecorator.java | 28 ++++++-- .../processor/TracingDecoratorTest.java | 71 ++++++++++--------- 2 files changed, 58 insertions(+), 41 deletions(-) diff --git a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java index b30dfd5..a8bf0f8 100644 --- a/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java +++ b/impl/src/main/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecorator.java @@ -29,6 +29,8 @@ import java.util.Objects; import java.util.stream.Collectors; +import io.opentelemetry.api.baggage.Baggage; +import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; import jakarta.annotation.Priority; import jakarta.decorator.Decorator; import jakarta.enterprise.context.Dependent; @@ -58,6 +60,7 @@ import io.quarkiverse.kafkastreamsprocessor.impl.configuration.TopologyConfigurationImpl; import io.quarkiverse.kafkastreamsprocessor.impl.protocol.KafkaStreamsProcessorHeaders; import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapGetter; +import io.quarkiverse.kafkastreamsprocessor.propagation.KafkaTextMapSetter; import lombok.extern.slf4j.Slf4j; /** @@ -77,7 +80,10 @@ public class TracingDecorator extends AbstractProcessorDecorator { * The {@link OpenTelemetry} configured by Quarkus */ private final OpenTelemetry openTelemetry; - + /** + * Injects Context into the Kafka headers of a message + */ + private final KafkaTextMapSetter textMapSetter; /** * Extracts Context from the Kafka headers of a message */ @@ -116,16 +122,20 @@ public class TracingDecorator extends AbstractProcessorDecorator { * The TopologyConfiguration after customization. */ @Inject - public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, Tracer tracer, + public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, + KafkaTextMapSetter textMapSetter, + Tracer tracer, TopologyConfigurationImpl configuration) { - this(openTelemetry, textMapGetter, tracer, configuration.getProcessorPayloadType().getName(), + this(openTelemetry, textMapGetter, textMapSetter, tracer, configuration.getProcessorPayloadType().getName(), JsonFormat.printer()); } public TracingDecorator(OpenTelemetry openTelemetry, KafkaTextMapGetter textMapGetter, - Tracer tracer, String applicationName, JsonFormat.Printer jsonPrinter) { + KafkaTextMapSetter textMapSetter, Tracer tracer, String applicationName, + JsonFormat.Printer jsonPrinter) { this.openTelemetry = openTelemetry; this.textMapGetter = textMapGetter; + this.textMapSetter = textMapSetter; this.tracer = tracer; this.applicationName = applicationName; this.jsonPrinter = jsonPrinter; @@ -157,7 +167,7 @@ public void process(Record record) { SpanBuilder spanBuilder = tracer.spanBuilder(applicationName); final TextMapPropagator propagator = openTelemetry.getPropagators().getTextMapPropagator(); Scope parentScope = null; - + Context extractedContext = null; try { // going through all propagation field names defined in the OTel configuration // we look if any of them has been set with a non-null value in the headers of the incoming message @@ -167,7 +177,7 @@ public void process(Record record) { .anyMatch(Objects::nonNull)) { // if that is the case, let's extract a Context initialized with the parent trace id, span id // and baggage present as headers in the incoming message - Context extractedContext = propagator.extract(Context.current(), record.headers(), textMapGetter); + extractedContext = propagator.extract(Context.current(), record.headers(), textMapGetter); // use the context as parent span for the built span spanBuilder.setParent(extractedContext); // we clean the headers to avoid their propagation in any outgoing message (knowing that by @@ -179,8 +189,12 @@ public void process(Record record) { Span span = spanBuilder.startSpan(); // baggage need to be explicitly set as current otherwise it is not propagated (baggage is independent of span // in opentelemetry) and actually lost as kafka headers are cleaned - try (Scope ignored = span.makeCurrent()) { + try (Scope ignored = (extractedContext != null) + ? Baggage.fromContext(extractedContext).makeCurrent() + : Scope.noop(); + Scope scope = span.makeCurrent()) { try { + propagator.inject(Context.current(), record.headers(), this.textMapSetter); getDelegate().process(record); span.setStatus(StatusCode.OK); } catch (KafkaException e) { diff --git a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java index da964df..caf68ba 100644 --- a/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java +++ b/impl/src/test/java/io/quarkiverse/kafkastreamsprocessor/impl/decorator/processor/TracingDecoratorTest.java @@ -136,7 +136,8 @@ public void setUp() { rootLogger.setLevel(Level.DEBUG); when(topologyConfiguration.getProcessorPayloadType()).thenReturn((Class) MockType.class); decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, - tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), + jsonPrinter); decorator.setDelegate(kafkaProcessor); decorator.init(processorContext); } @@ -147,9 +148,9 @@ public void shouldSetMDCFromUberTraceId() { try (Scope parentScope = parentSpan.makeCurrent()) { Headers headers = new RecordHeaders(); otel.getOpenTelemetry() - .getPropagators() - .getTextMapPropagator() - .inject(Context.current(), headers, kafkaTextMapSetter); + .getPropagators() + .getTextMapPropagator() + .inject(Context.current(), headers, kafkaTextMapSetter); Record record = new Record<>(null, null, 0L, headers); decorator.process(record); @@ -166,13 +167,13 @@ public void shouldSetMDCFromUberTraceId() { public void shouldStartAndFinishSpan() { // manually build parent span to inject some TraceState and test the state is well recorded in the created span Span parentSpan = Span.wrap(SpanContext.create(IdGenerator.random().generateTraceId(), IdGenerator.random() - .generateSpanId(), TraceFlags.getSampled(), TraceState.builder().put("state1", "value2").build())); + .generateSpanId(), TraceFlags.getSampled(), TraceState.builder().put("state1", "value2").build())); try (Scope parentScope = parentSpan.makeCurrent()) { RecordHeaders headers = new RecordHeaders(); otel.getOpenTelemetry() - .getPropagators() - .getTextMapPropagator() - .inject(Context.current(), headers, kafkaTextMapSetter); + .getPropagators() + .getTextMapPropagator() + .inject(Context.current(), headers, kafkaTextMapSetter); Record record = new Record<>(null, null, 0L, headers); decorator.process(record); @@ -181,12 +182,12 @@ public void shouldStartAndFinishSpan() { } assertThat(otel.getSpans()) - .hasTracesSatisfyingExactly( - trace -> trace.hasSpansSatisfyingExactly( - span -> span.hasTraceId(parentSpan.getSpanContext().getTraceId()) - .hasName(PROCESSOR_NAME) - .hasParentSpanId(parentSpan.getSpanContext().getSpanId()) - .hasTraceState(TraceState.builder().put("state1", "value2").build()))); + .hasTracesSatisfyingExactly( + trace -> trace.hasSpansSatisfyingExactly( + span -> span.hasTraceId(parentSpan.getSpanContext().getTraceId()) + .hasName(PROCESSOR_NAME) + .hasParentSpanId(parentSpan.getSpanContext().getSpanId()) + .hasTraceState(TraceState.builder().put("state1", "value2").build()))); } @Test @@ -195,15 +196,16 @@ public void shouldCleanMDCAndScopeInCaseOfException() { try (Scope parentScope = parentSpan.makeCurrent()) { Headers headers = new RecordHeaders(); otel.getOpenTelemetry() - .getPropagators() - .getTextMapPropagator() - .inject(Context.current(), headers, kafkaTextMapSetter); + .getPropagators() + .getTextMapPropagator() + .inject(Context.current(), headers, kafkaTextMapSetter); Record record = new Record<>(null, Ping.newBuilder() - .setMessage("blabla") - .build(), 0L, headers); + .setMessage("blabla") + .build(), 0L, headers); decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, - tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + kafkaTextMapSetter, + tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); decorator.setDelegate(new ThrowExceptionProcessor()); decorator.init(processorContext); @@ -215,12 +217,12 @@ public void shouldCleanMDCAndScopeInCaseOfException() { assertNull(MDC.get("traceId")); assertThat(otel.getSpans()) - .hasTracesSatisfyingExactly(trace -> trace.hasSpansSatisfyingExactly( - span -> span.hasSpanId(parentSpan.getSpanContext().getSpanId()), - span -> span.hasTraceId(parentSpan.getSpanContext().getTraceId()) - .hasName(PROCESSOR_NAME) - .hasStatusSatisfying(status -> status.hasCode(StatusCode.ERROR)) - .hasException(new TestException()))); + .hasTracesSatisfyingExactly(trace -> trace.hasSpansSatisfyingExactly( + span -> span.hasSpanId(parentSpan.getSpanContext().getSpanId()), + span -> span.hasTraceId(parentSpan.getSpanContext().getTraceId()) + .hasName(PROCESSOR_NAME) + .hasStatusSatisfying(status -> status.hasCode(StatusCode.ERROR)) + .hasException(new TestException()))); } @Test @@ -257,7 +259,7 @@ void shouldManageRuntimeException() throws Throwable { decorator.process(new Record<>("key", inputMessage, 0L)); assertThat(getLogs(), hasItem(allOf(containsString("ERROR"), - containsString("Runtime error caught while processing the message"), containsString(exception.getMessage())))); + containsString("Runtime error caught while processing the message"), containsString(exception.getMessage())))); assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("marshalled")))); } @@ -269,7 +271,7 @@ private static List getLogs() { void shouldLetBubbleUpKafkaExceptionAndLogMessage() { doThrow(new KafkaException()).when(kafkaProcessor).process(any()); Assertions.assertThrows(KafkaException.class, - () -> decorator.process(new Record<>("key", inputMessage, 0L))); + () -> decorator.process(new Record<>("key", inputMessage, 0L))); } @Test @@ -296,7 +298,7 @@ void shouldLogMetadataEvenIfValueMarshallingToJSONFails() throws Throwable { decorator.process(new Record<>("key", inputMessage, 0L)); assertThat(getLogs(), - hasItem(allOf(containsString("ERROR"), containsString(protocolBufferException.getMessage())))); + hasItem(allOf(containsString("ERROR"), containsString(protocolBufferException.getMessage())))); assertThat(getLogs(), hasItem(allOf(containsString("DEBUG"), containsString("value=null")))); } @@ -305,7 +307,8 @@ void shouldLogRawToStringValueIfNotProtobuf() throws Throwable { Processor kafkaProcessor = mock(Processor.class); ProcessorContext processorContext = mock(ProcessorContext.class); TracingDecorator decorator = new TracingDecorator(GlobalOpenTelemetry.get(), kafkaTextMapGetter, - tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + kafkaTextMapSetter, tracer, topologyConfiguration.getProcessorPayloadType().getName(), + jsonPrinter); decorator.setDelegate(kafkaProcessor); decorator.init(processorContext); @@ -322,10 +325,10 @@ void shouldLogRawToStringValueIfNotProtobuf() throws Throwable { void shouldPropagateOpentelemetryW3CBaggage() { // header value format here: https://www.w3.org/TR/baggage/#baggage-http-header-format Headers headers = new RecordHeaders().add(W3C_TRACE_ID, TRACE_PARENT.getBytes()) - .add(W3C_BAGGAGE, "key1=value1,key2=value2".getBytes()); + .add(W3C_BAGGAGE, "key1=value1,key2=value2".getBytes()); Record record = new Record<>(null, Ping.newBuilder().setMessage("blabla").build(), 0L, headers); - decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, - tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); + decorator = new TracingDecorator(otel.getOpenTelemetry(), kafkaTextMapGetter, kafkaTextMapSetter, + tracer, topologyConfiguration.getProcessorPayloadType().getName(), jsonPrinter); decorator.setDelegate(new LogOpentelemetryBaggageProcessor()); decorator.init(processorContext); @@ -363,7 +366,7 @@ public void process(Record record) { public static String w3cHeader(String traceId, String spanId) { return String.format("00-%s-%s-01", StringUtils.leftPad(traceId, TraceId.getLength(), '0'), - StringUtils.leftPad(spanId, SpanId.getLength(), '0')); + StringUtils.leftPad(spanId, SpanId.getLength(), '0')); } public static class MockType {