-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
2.8.4.
Describe the bug
ListenerContainerFactoryConfigurer
now creates its error handler here:
Lines 235 to 246 in 93efeb4
protected CommonErrorHandler createErrorHandler(DeadLetterPublishingRecoverer deadLetterPublishingRecoverer, | |
Configuration configuration) { | |
DefaultErrorHandler errorHandler = createDefaultErrorHandlerInstance(deadLetterPublishingRecoverer); | |
errorHandler.defaultFalse(); | |
errorHandler.setCommitRecovered(true); | |
errorHandler.setLogLevel(KafkaException.Level.DEBUG); | |
if (this.blockingExceptionTypes != null) { | |
errorHandler.addRetryableExceptions(this.blockingExceptionTypes); | |
} | |
this.errorHandlerCustomizer.accept(errorHandler); | |
return errorHandler; | |
} |
There are two breaking changes/regressions compared to version 2.8.3:
createDefaultErrorHandlerInstance
no longer provides a "no retry" back off strategy by default. The outcome of this change is that a to-be-recovered record is retried 10 times instead of immediately being sent into the next recovery topic.errorHandler.defaultFalse
completely disables the default classification of the fatal exceptions (such asDeserializationException.class
). In fact, every classification request now returnsfalse
.
The second change causes every back off exception to be printed at error level due to the code here:
Lines 143 to 164 in f0ad7b0
protected RecoveryStrategy getRecoveryStrategy(List<ConsumerRecord<?, ?>> records, | |
@Nullable Consumer<?, ?> recoveryConsumer, Exception thrownException) { | |
if (getClassifier().classify(thrownException)) { | |
return this.failureTracker::recovered; | |
} | |
else { | |
try { | |
this.failureTracker.getRecoverer().accept(records.get(0), recoveryConsumer, thrownException); | |
this.failureTracker.getRetryListeners().forEach(rl -> rl.recovered(records.get(0), thrownException)); | |
} | |
catch (Exception ex) { | |
if (records.size() > 0) { | |
this.logger.error(ex, () -> "Recovery of record (" | |
+ KafkaUtils.format(records.get(0)) + ") failed"); | |
this.failureTracker.getRetryListeners().forEach(rl -> | |
rl.recoveryFailed(records.get(0), thrownException, ex)); | |
} | |
return (rec, excep, cont, consumer) -> NEVER_SKIP_PREDICATE.test(rec, excep); | |
} | |
return (rec, excep, cont, consumer) -> ALWAYS_SKIP_PREDICATE.test(rec, excep); | |
} | |
} |
getClassifier().classify(thrownException)
now always returns false and this.failureTracker.getRecoverer().accept
may throw a back off exception, which does not reach the following special back off exception handler any longer:
spring-kafka/spring-kafka/src/main/java/org/springframework/kafka/listener/SeekUtils.java
Lines 109 to 113 in 93efeb4
catch (Exception ex) { | |
if (isBackoffException(ex)) { | |
logger.debug(ex, () -> ListenerUtils.recordToString(record) | |
+ " included in seeks due to retry back off"); | |
} |
To Reproduce
Unfortunately, I do not have a sample right now, but I will try to prepare it this week. I believe that any application that uses the retryable topics recovery mechanism is impacted.
Expected behavior
I expect the default behaviour of the retryable topics recovery system to be identical to version 2.8.3.