Skip to content

Listener exceptions not saved to the observation #3049

@foaw

Description

@foaw

In what version(s) of Spring for Apache Kafka are you seeing this issue?

  • org.springframework.kafka:spring-kafka:3.1.1
  • org.apache.kafka:kafka-clients:3.6.1

Describe the bug

When an exception is thrown from a listener, by default the exception is not saved in the current listener observation.

Observation observation = KafkaListenerObservation.LISTENER_OBSERVATION.observation(
this.containerProperties.getObservationConvention(),
DefaultKafkaListenerObservationConvention.INSTANCE,
() -> new KafkaRecordReceiverContext(cRecord, getListenerId(), this::clusterId),
this.observationRegistry);
return observation.observe(() -> {
try {
invokeOnMessage(cRecord);
successTimer(sample, cRecord);
recordInterceptAfter(cRecord, null);
}
catch (RuntimeException e) {
failureTimer(sample, cRecord);
recordInterceptAfter(cRecord, e);
if (this.commonErrorHandler == null) {
throw e;
}
try {
invokeErrorHandler(cRecord, iterator, e);
commitOffsetsIfNeededAfterHandlingError(cRecord);
}
catch (KafkaException ke) {
ke.selfLog(ERROR_HANDLER_THREW_AN_EXCEPTION, this.logger);
return ke;
}
catch (RuntimeException ee) {
this.logger.error(ee, ERROR_HANDLER_THREW_AN_EXCEPTION);
return ee;
}
catch (Error er) { // NOSONAR
this.logger.error(er, "Error handler threw an error");
throw er;
}
}
return null;
});

Observation#observe would do it on its own, but then the provided supplier has to re-throw the exception rather than just return it, which it probably does for ListenerConsumer#invokeInTransaction.

To Reproduce

  1. Make & register a message listener.
  2. Send a message to the listener topic. (For example, java.lang.NullPointerException.)
  3. Throw an exception from the listener.

Expected behavior

The error handler is invoked and all, but on top of that, the (original) exception is embedded into the observation, allowing downstream tracing code to handle it.

Sample

For simplicity's sake, I will only leave the listener code here. In the configuration, there is only a consumer group identifier. Let me know if I should make this into a repository.

@KafkaListener(topics = "error")
public void onErrorMessage(@Payload String payload) throws Throwable {
    Class<?> errorClass = Class.forName(payload);
    throw (Throwable) errorClass.getDeclaredConstructor().newInstance();
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions