-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Description
I've reordered the sections for this request because, in my opinion, it is easier to understand the desired behaviour when the limitations of the current approach are explained first.
Current Behavior
Let's consider a situation where max.poll.records
(500 by default) are available for consumption in a non-transactional record listener container, but the very first record cannot be processed immediately due to e.g. an issue with an external data provider. Here is what happens in the code in case of, at the very least, non-blocking retries:
KafkaMessageListenerContainer.ListenerConsumer.doInvokeRecordListener
is eventually invoked withrecord
pointing to the first record anditerator
holding the remaining 499.this.invokeOnMessage
fails with an exception.- An error handler is installed, so
this.invokeErrorHandler
is called. this.commonErrorHandler.remainingRecords()
returnstrue
. Both the failed record and all the remaining records from the iterator are drained into a temporary array,this.commonErrorHandler.handleRemaining
leads toDefaultErrorHandler.handleRemaining
that performsSeekUtils.seekOrRecover
.- An attempt is made to recover the failed record.
- No matter whether the recovery operation succeeds or not, the rest of the batch is rewound in
seekPartitions
.
Unfortunately, this unconditional rewind operation leads to enormous spikes in network I/O as pretty much the same records are requested from Kafka over and over again. To be precise, with a local Kafka broker and a test application that always triggers a recovery I could see spikes exceeding 2000 Mbps in iptraf
.
Expected Behavior
I believe that it is not necessary to always rewind the remaining 499 records if the recovery is successful. This is what may happen with the failed record when non-blocking retries are enabled:
- If the record cannot be processed due to a fatal exception (e.g.
ClassCastException
) then it is sent straight into the DLT topic. - If the exception is not fatal, then the record is sent into the next retry destination.
Both these cases allow the next iterator
record to be processed immediately as if nothing happened. This very same record will be at the front of the batch during the next poll
invocation anyway! The only special case I can think of is KafkaBackoffException
, which does absolutely require rewinding all the offsets so that the same batch is consumed again when the affected partitions are resumed.
Context
The production rollout of our service that uses the non-blocking retries feature provided by Spring Kafka triggered a number of network I/O alarms, so we decided to find the root cause. This feature request is the result of the investigation :)
Implementing a workaround for this issue is possible but, admittedly, quite tricky as invokeErrorHandler
is a private API method. These are the steps that may be taken:
- Implement a custom
CommonErrorHandler
that:- Exposes an API that allows
remainingRecords
to return eithertrue
orfalse
. - Overrides
handleRecord
in a way that delegates tohandleRemaining
of the default error handler with a singleton list as the second argument.
- Exposes an API that allows
- Implement a custom
RecordInterceptor
with an overriddenfailure
method that uses the above API to makeremainingRecords
returntrue
only ifSeekUtils.isBackoffException
istrue
.
ListenerContainerFactoryConfigurer.setContainerCustomizer
may then be used to tie things together, e.g.:
@Bean(name = RetryTopicInternalBeanNames.LISTENER_CONTAINER_FACTORY_CONFIGURER_NAME)
public ListenerContainerFactoryConfigurer listenerContainerFactoryConfigurer(
KafkaConsumerBackoffManager kafkaConsumerBackoffManager,
DeadLetterPublishingRecovererFactory deadLetterPublishingRecovererFactory,
@Qualifier(RetryTopicInternalBeanNames.INTERNAL_BACKOFF_CLOCK_BEAN_NAME) Clock clock) {
final var configurer = new ListenerContainerFactoryConfigurer(...);
configurer.setContainerCustomizer(container -> {
final var customErrorHandler = new CustomErrorHandler(container.getCommonErrorHandler());
container.setRecordInterceptor(new CustomRecordInterceptor<>(customErrorHandler));
container.setCommonErrorHandler(customErrorHandler);
});
return configurer;
}