-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
Expected Behavior
I have a use-case where I only want to seek to a specified offset if it is before the consumer's current offset. Ideally I would like to have a method on ConsumerSeekCallback
where I would provide a Function<Long, Long>
that computes the new offset given the current one.
Current Behavior
Currently the framework only provides a way to seek to a specific offset unconditionally, or provide a relative offset from the current one. There is no way to access the current offset value when performing a seek.
Context
For my use-case, there are certain events that require re-processing records that were already processed before. However, I should not seek forward in the Kafka partition, because the partition may contain other records that are unrelated to the event and shouldn't be skipped (sorry if this is a bit vague).
So, I need to know what the current partition offset is before I decide to actually seek from the event.
In order to try to accomplish this, I have the Kafka listener extend AbstractConsumerSeekAware
, then use the methods from that to perform the seek when the event occurs.
I have considered using the onIdleContainer
method to store the current offsets, but reading the source code it seems that this is only called after a certain time has elapsed, so these offsets are not guaranteed to be up-to-date by the time I decide to seek.
I have also considered using the KafkaListenerEndpointRegistry
, however this only allows pausing/resuming/stopping containers, and does not provide access to the underlying Consumer
.