Skip to content

Commit 73aeaaf

Browse files
committed
propagete scope in async failures
Signed-off-by: Igor Macedo Quintanilha <[email protected]>
1 parent 6425682 commit 73aeaaf

File tree

3 files changed

+158
-9
lines changed

3 files changed

+158
-9
lines changed

spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1498,7 +1498,11 @@ protected void handleAsyncFailure() {
14981498
// We will give up on retrying with the remaining copied and failed Records.
14991499
for (FailedRecordTuple<K, V> copyFailedRecord : copyFailedRecords) {
15001500
try {
1501-
invokeErrorHandlerBySingleRecord(copyFailedRecord);
1501+
if (copyFailedRecord.observation != null) {
1502+
copyFailedRecord.observation.scoped(() -> invokeErrorHandlerBySingleRecord(copyFailedRecord));
1503+
} else {
1504+
invokeErrorHandlerBySingleRecord(copyFailedRecord);
1505+
}
15021506
}
15031507
catch (Exception e) {
15041508
this.logger.warn(() ->
@@ -3433,7 +3437,7 @@ private Collection<ConsumerRecord<K, V>> getHighestOffsetRecords(ConsumerRecords
34333437
}
34343438

34353439
private void callbackForAsyncFailure(ConsumerRecord<K, V> cRecord, RuntimeException ex) {
3436-
this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex));
3440+
this.failedRecords.addLast(new FailedRecordTuple<>(cRecord, ex, observationRegistry != null ? observationRegistry.getCurrentObservation() : null));
34373441
}
34383442

34393443
@Override
@@ -4050,6 +4054,6 @@ private static class StopAfterFenceException extends KafkaException {
40504054

40514055
}
40524056

4053-
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex) { }
4057+
private record FailedRecordTuple<K, V>(ConsumerRecord<K, V> record, RuntimeException ex, Observation observation) { }
40544058

40554059
}

spring-kafka/src/main/java/org/springframework/kafka/listener/adapter/MessagingMessageListenerAdapter.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -736,12 +736,13 @@ protected void asyncFailure(Object request, @Nullable Acknowledgment acknowledgm
736736
"Async Fail", Objects.requireNonNull(source).getPayload()), cause));
737737
}
738738
catch (Throwable ex) {
739-
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
740739
acknowledge(acknowledgment);
741740
if (canAsyncRetry(request, ex) && this.asyncRetryCallback != null) {
742741
@SuppressWarnings("unchecked")
743742
ConsumerRecord<K, V> record = (ConsumerRecord<K, V>) request;
744743
this.asyncRetryCallback.accept(record, (RuntimeException) ex);
744+
} else {
745+
this.logger.error(t, () -> "Future, Mono, or suspend function was completed with an exception for " + source);
745746
}
746747
}
747748
}

spring-kafka/src/test/java/org/springframework/kafka/support/micrometer/ObservationTests.java

Lines changed: 149 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import java.nio.charset.StandardCharsets;
2020
import java.time.Duration;
21+
import java.util.ArrayList;
2122
import java.util.Arrays;
2223
import java.util.Deque;
2324
import java.util.List;
@@ -27,6 +28,7 @@
2728
import java.util.concurrent.ExecutionException;
2829
import java.util.concurrent.TimeUnit;
2930
import java.util.concurrent.TimeoutException;
31+
import java.util.concurrent.atomic.AtomicInteger;
3032
import java.util.concurrent.atomic.AtomicReference;
3133
import java.util.stream.StreamSupport;
3234

@@ -47,7 +49,10 @@
4749
import io.micrometer.tracing.handler.PropagatingSenderTracingObservationHandler;
4850
import io.micrometer.tracing.propagation.Propagator;
4951
import io.micrometer.tracing.test.simple.SimpleSpan;
52+
import io.micrometer.tracing.test.simple.SimpleTraceContext;
5053
import io.micrometer.tracing.test.simple.SimpleTracer;
54+
import io.opentelemetry.api.trace.SpanContext;
55+
import io.opentelemetry.context.Context;
5156
import org.apache.kafka.clients.admin.AdminClientConfig;
5257
import org.apache.kafka.clients.consumer.Consumer;
5358
import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -62,6 +67,15 @@
6267
import org.apache.kafka.common.header.internals.RecordHeader;
6368
import org.jspecify.annotations.Nullable;
6469
import org.junit.jupiter.api.Test;
70+
import org.springframework.core.task.TaskExecutor;
71+
import org.springframework.kafka.annotation.DltHandler;
72+
import org.springframework.kafka.annotation.EnableKafkaRetryTopic;
73+
import org.springframework.kafka.annotation.RetryableTopic;
74+
import org.springframework.kafka.listener.ContainerProperties;
75+
import org.springframework.retry.annotation.Backoff;
76+
import org.springframework.scheduling.TaskScheduler;
77+
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
78+
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
6579
import reactor.core.publisher.Mono;
6680

6781
import org.springframework.beans.factory.annotation.Autowired;
@@ -72,6 +86,9 @@
7286
import org.springframework.kafka.KafkaException;
7387
import org.springframework.kafka.annotation.EnableKafka;
7488
import org.springframework.kafka.annotation.KafkaListener;
89+
90+
import org.springframework.kafka.listener.DefaultErrorHandler;
91+
import org.springframework.kafka.listener.MessageListenerContainer;
7592
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
7693
import org.springframework.kafka.config.KafkaListenerEndpointRegistry;
7794
import org.springframework.kafka.core.ConsumerFactory;
@@ -82,6 +99,7 @@
8299
import org.springframework.kafka.core.ProducerFactory;
83100
import org.springframework.kafka.listener.MessageListenerContainer;
84101
import org.springframework.kafka.listener.RecordInterceptor;
102+
85103
import org.springframework.kafka.requestreply.ReplyingKafkaTemplate;
86104
import org.springframework.kafka.support.ProducerListener;
87105
import org.springframework.kafka.support.micrometer.KafkaListenerObservation.DefaultKafkaListenerObservationConvention;
@@ -94,6 +112,8 @@
94112
import org.springframework.test.context.junit.jupiter.SpringJUnitConfig;
95113
import org.springframework.util.StringUtils;
96114

115+
import static java.util.concurrent.CompletableFuture.runAsync;
116+
import static java.util.concurrent.CompletableFuture.supplyAsync;
97117
import static org.assertj.core.api.Assertions.assertThat;
98118
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
99119
import static org.awaitility.Awaitility.await;
@@ -113,7 +133,8 @@
113133
@EmbeddedKafka(topics = {ObservationTests.OBSERVATION_TEST_1, ObservationTests.OBSERVATION_TEST_2,
114134
ObservationTests.OBSERVATION_TEST_3, ObservationTests.OBSERVATION_TEST_4, ObservationTests.OBSERVATION_REPLY,
115135
ObservationTests.OBSERVATION_RUNTIME_EXCEPTION, ObservationTests.OBSERVATION_ERROR,
116-
ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE}, partitions = 1)
136+
ObservationTests.OBSERVATION_TRACEPARENT_DUPLICATE, ObservationTests.OBSERVATION_ASYNC_FAILURE_TEST,
137+
ObservationTests.OBSERVATION_ASYNC_FAILURE_WITH_RETRY_TEST}, partitions = 1)
117138
@DirtiesContext
118139
public class ObservationTests {
119140

@@ -137,6 +158,51 @@ public class ObservationTests {
137158

138159
public final static String OBSERVATION_TRACEPARENT_DUPLICATE = "observation.traceparent.duplicate";
139160

161+
public final static String OBSERVATION_ASYNC_FAILURE_TEST = "observation.async.failure.test";
162+
163+
public final static String OBSERVATION_ASYNC_FAILURE_WITH_RETRY_TEST = "observation.async.failure.retry.test";
164+
165+
@Test
166+
void asyncRetryScopePropagation(@Autowired AsyncFailureListener asyncFailureListener,
167+
@Autowired KafkaTemplate<Integer, String> template,
168+
@Autowired SimpleTracer tracer,
169+
@Autowired ObservationRegistry observationRegistry) throws InterruptedException {
170+
171+
// Clear any previous spans
172+
tracer.getSpans().clear();
173+
174+
// Create an observation scope to ensure we have a proper trace context
175+
var testObservation = Observation.createNotStarted("test.message.send", observationRegistry);
176+
177+
// Send a message within the observation scope to ensure trace context is propagated
178+
testObservation.observe(() -> {
179+
try {
180+
template.send(OBSERVATION_ASYNC_FAILURE_TEST, "trigger-async-failure").get(5, TimeUnit.SECONDS);
181+
} catch (Exception e) {
182+
throw new RuntimeException("Failed to send message", e);
183+
}
184+
});
185+
186+
// Wait for the listener to process the message (initial + retry + DLT = 3 invocations)
187+
assertThat(asyncFailureListener.asyncFailureLatch.await(15, TimeUnit.SECONDS)).isTrue();
188+
189+
// Verify that the captured spans from the listener contexts are all part of the same trace
190+
// This demonstrates that the tracing context propagates correctly through the retry mechanism
191+
Deque<SimpleSpan> spans = tracer.getSpans();
192+
assertThat(spans).hasSizeGreaterThanOrEqualTo(4); // template + listener + retry + DLT spans
193+
194+
// Verify that spans were captured for each phase and belong to the same trace
195+
assertThat(asyncFailureListener.capturedSpanInListener).isNotNull();
196+
assertThat(asyncFailureListener.capturedSpanInRetry).isNotNull();
197+
assertThat(asyncFailureListener.capturedSpanInDlt).isNotNull();
198+
199+
// All spans should have the same trace ID, demonstrating trace continuity
200+
var originalTraceId = asyncFailureListener.capturedSpanInListener.getTraceId();
201+
assertThat(originalTraceId).isNotBlank();
202+
assertThat(asyncFailureListener.capturedSpanInRetry.getTraceId()).isEqualTo(originalTraceId);
203+
assertThat(asyncFailureListener.capturedSpanInDlt.getTraceId()).isEqualTo(originalTraceId);
204+
}
205+
140206
@Test
141207
void endToEnd(@Autowired Listener listener, @Autowired KafkaTemplate<Integer, String> template,
142208
@Autowired SimpleTracer tracer, @Autowired KafkaListenerEndpointRegistry rler,
@@ -628,6 +694,11 @@ ConcurrentKafkaListenerContainerFactory<Integer, String> kafkaListenerContainerF
628694
if (container.getListenerId().equals("obs3")) {
629695
container.setKafkaAdmin(this.mockAdmin);
630696
}
697+
if (container.getListenerId().contains("asyncFailure")) {
698+
// Enable async acks to trigger async failure handling
699+
container.getContainerProperties().setAsyncAcks(true);
700+
container.getContainerProperties().setAckMode(ContainerProperties.AckMode.MANUAL);
701+
}
631702
if (container.getListenerId().equals("obs4")) {
632703
container.setRecordInterceptor(new RecordInterceptor<>() {
633704

@@ -683,29 +754,45 @@ Propagator propagator(Tracer tracer) {
683754
// List of headers required for tracing propagation
684755
@Override
685756
public List<String> fields() {
686-
return Arrays.asList("foo", "bar");
757+
return Arrays.asList("traceId", "foo", "bar");
687758
}
688759

689760
// This is called on the producer side when the message is being sent
690-
// Normally we would pass information from tracing context - for tests we don't need to
691761
@Override
692762
public <C> void inject(TraceContext context, @Nullable C carrier, Setter<C> setter) {
693763
setter.set(carrier, "foo", "some foo value");
694764
setter.set(carrier, "bar", "some bar value");
695765

766+
if (context.traceId() != "") {
767+
setter.set(carrier, "traceId", context.traceId());
768+
setter.set(carrier, "spanId", context.spanId());
769+
}
770+
696771
// Add a traceparent header to simulate W3C trace context
697772
setter.set(carrier, "traceparent", "traceparent-from-propagator");
698773
}
699774

700775
// This is called on the consumer side when the message is consumed
701-
// Normally we would use tools like Extractor from tracing but for tests we are just manually creating a span
702776
@Override
703777
public <C> Span.Builder extract(C carrier, Getter<C> getter) {
704778
String foo = getter.get(carrier, "foo");
705779
String bar = getter.get(carrier, "bar");
706-
return tracer.spanBuilder()
780+
781+
var traceId = getter.get(carrier, "traceId");
782+
var spanId = getter.get(carrier, "spanId");
783+
784+
Span.Builder spanBuilder = tracer.spanBuilder()
707785
.tag("foo", foo)
708786
.tag("bar", bar);
787+
// If we have trace context from headers, tag it for verification
788+
if (traceId != null) {
789+
var traceContext = new SimpleTraceContext();
790+
traceContext.setTraceId(traceId);
791+
traceContext.setSpanId(spanId);
792+
spanBuilder = spanBuilder.setParent(traceContext);
793+
}
794+
795+
return spanBuilder;
709796
}
710797
};
711798
}
@@ -720,6 +807,15 @@ ExceptionListener exceptionListener() {
720807
return new ExceptionListener();
721808
}
722809

810+
@Bean
811+
AsyncFailureListener asyncFailureListener(SimpleTracer tracer) {
812+
return new AsyncFailureListener(tracer);
813+
}
814+
815+
@Bean
816+
public TaskScheduler taskExecutor() {
817+
return new ThreadPoolTaskScheduler();
818+
}
723819
}
724820

725821
public static class Listener {
@@ -801,4 +897,52 @@ Mono<Void> receive1(ConsumerRecord<Object, Object> record) {
801897

802898
}
803899

900+
public static class AsyncFailureListener {
901+
902+
final CountDownLatch asyncFailureLatch = new CountDownLatch(3);
903+
904+
volatile SimpleSpan capturedSpanInListener;
905+
volatile SimpleSpan capturedSpanInRetry;
906+
volatile SimpleSpan capturedSpanInDlt;
907+
908+
private final SimpleTracer tracer;
909+
910+
public AsyncFailureListener(SimpleTracer tracer) {
911+
this.tracer = tracer;
912+
}
913+
914+
@RetryableTopic(
915+
attempts = "2",
916+
backoff = @Backoff(delay = 1000)
917+
)
918+
@KafkaListener(id = "asyncFailure", topics = OBSERVATION_ASYNC_FAILURE_TEST)
919+
CompletableFuture<Void> handleAsync(ConsumerRecord<Integer, String> record) {
920+
// Use topic name to distinguish between original and retry calls
921+
String topicName = record.topic();
922+
923+
if (topicName.equals(OBSERVATION_ASYNC_FAILURE_TEST)) {
924+
// This is the original call
925+
this.capturedSpanInListener = this.tracer.currentSpan();
926+
} else {
927+
// This is a retry call (topic name will be different for retry topics)
928+
this.capturedSpanInRetry = this.tracer.currentSpan();
929+
}
930+
931+
this.asyncFailureLatch.countDown();
932+
933+
// Return a failed CompletableFuture to trigger async failure handling
934+
return supplyAsync(() -> {
935+
throw new RuntimeException("Async failure for observation test");
936+
});
937+
}
938+
939+
@DltHandler
940+
void handleDlt(ConsumerRecord<Integer, String> record, Exception exception) {
941+
this.capturedSpanInDlt = this.tracer.currentSpan();
942+
this.asyncFailureLatch.countDown();
943+
}
944+
}
945+
946+
947+
804948
}

0 commit comments

Comments
 (0)