Skip to content

DefaultErrorHandler is not able to seek in case of an exception during the commit #3019

@frosiere

Description

@frosiere

The listener error handler (DefaultErrorHandler) is not able to seek in case of an exception during the commit.

The code is not protected against exceptions such as a RebalanceInProgressException. The full list of exceptions can be found on KafkaConsumer#commitSync and KafkaConsumer#commitAsync methods.

Due to this bug, some records are skipped during a reprocessing.

The issue can be fixed by wrapping the commit into a try block with a finally block for the seek part.

See FailedBatchProcessor#seekOrRecover for more details.

The following portion of code

if (offsets.size() > 0) {
    commit(consumer, container, offsets);
}
if (isSeekAfterError()) {
...

shoud/could be replaced by

try {
    if (offsets.size() > 0) {
        commit(consumer, container, offsets);
    }
} finally {
    if (isSeekAfterError()) {
...

The proposed fix has been tested and is working fine.

I can contribute by dropping a PR, I can also provide more details if needed.

Any other ideas, suggestions are more than welcome.

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions