Skip to content

Conversation

lucasbru
Copy link
Member

@lucasbru lucasbru commented Sep 9, 2025

In the consumer, we invoke the consumer rebalance onPartitionRevoked or
onPartitionLost callbacks, when the consumer closes. The point is that
the application may want to commit, or wipe the state if we are closing
unsuccessfully.

In the StreamsRebalanceListener, we did not implement this behavior,
which means when closing the consumer we may lose some progress, and in
the worst case also miss that we have to wipe our local state state
since we got fenced.

In this PR we implement StreamsRebalanceListenerInvoker, very similarly
to ConsumerRebalanceListenerInvoker and invoke it in Consumer.close.

Reviewers: Lianet Magrans [email protected], Matthias J. Sax
[email protected], TengYao Chi [email protected],
Uladzislau Blok [email protected]

Copy link
Contributor

@Copilot Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull Request Overview

This PR implements a StreamsRebalanceListener invoker to trigger rebalance callbacks when the consumer closes, ensuring that Streams applications can properly handle state cleanup when losing tasks due to fencing or other failures.

  • Adds StreamsRebalanceListenerInvoker class to manage and invoke streams rebalance callbacks with proper error handling
  • Updates consumer close logic to invoke streams rebalance listener callbacks based on member epoch status
  • Refactors background event processor to use the new invoker pattern and removes duplicate callback tracking

Reviewed Changes

Copilot reviewed 6 out of 6 changed files in this pull request and generated 5 comments.

Show a summary per file
File Description
StreamsRebalanceListenerInvoker.java New invoker class that handles streams rebalance listener callbacks with logging and error handling
StreamsRebalanceListenerInvokerTest.java Comprehensive test coverage for the new invoker class
AsyncKafkaConsumer.java Refactored to use the new invoker and updated close logic to trigger streams callbacks
AsyncKafkaConsumerTest.java Added tests for the new close behavior with streams listeners
DefaultStreamsRebalanceListener.java Added calls to set reconciled assignment after successful task operations
DefaultStreamsRebalanceListenerTest.java Updated tests to use mocked StreamsRebalanceData and verify assignment setting

Tip: Customize your code reviews with copilot-instructions.md. Create the file or learn how to get started.


private Optional<StreamsRebalanceListener> streamsRebalanceListener = Optional.empty();
private final Optional<StreamsRebalanceData> streamsRebalanceData;
private final Optional<StreamsRebalanceListenerInvoker> streamsRebalanceListenerInvoker;
Copy link
Contributor

@UladzislauBlok UladzislauBlok Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

out of curiosity: why this is optional, if we anyway will throw exception at line 292 in case of empty optional?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect it's optional because it will only exist if the AsycKafkaConsumer is being used with the streams rebalance protocol.

But why do we need to declare it here in the internal class if it already exist in the parent AsyncKafkaConsumer?

private final Optional<StreamsRebalanceListenerInvoker> streamsRebalanceListenerInvoker;

(can we just use that one? as we do with the rebalanceListenerInvoker)

Copy link
Member

@mjsax mjsax Sep 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncConsumer can be used with "plain consumer" or inside "Kafka Streams". We would use this new handler only for the KS case, but not the "plain consumer" case.

And L292 is only executed for the KS case.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

True. Removed the second field and used the reference in the outer class instead.

Comment on lines +99 to +72
StreamsRebalanceListener firstListener = org.mockito.Mockito.mock(StreamsRebalanceListener.class);
StreamsRebalanceListener secondListener = org.mockito.Mockito.mock(StreamsRebalanceListener.class);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: full import?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Member

@mjsax mjsax left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Made an initial pass. Did not look into the test code yet.

"rebalance protocol events");
}
this.streamsRebalanceListener = Optional.of(streamsRebalanceListener);
public BackgroundEventProcessor(final Optional<StreamsRebalanceListenerInvoker> streamsRebalanceListenerInvoker) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For this constructor, it seems not to make sense to pass in an Optional? If we are in the KS case, and use this constructor, we must pass the StreamsRebalanceListenerInvoker.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removed the constructor.

streamsRebalanceData().setReconciledAssignment(assignment);
}
final Optional<Exception> exceptionFromCallback = Optional.ofNullable(streamsRebalanceListenerInvoker().invokeTasksAssigned(assignment));
error = exceptionFromCallback.map(e -> ConsumerUtils.maybeWrapAsKafkaException(e, "Task assignment callback throws an error"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As above (same elsewhere).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See above.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @lucasbru ! Took a first pass.


private Optional<StreamsRebalanceListener> streamsRebalanceListener = Optional.empty();
private final Optional<StreamsRebalanceData> streamsRebalanceData;
private final Optional<StreamsRebalanceListenerInvoker> streamsRebalanceListenerInvoker;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect it's optional because it will only exist if the AsycKafkaConsumer is being used with the streams rebalance protocol.

But why do we need to declare it here in the internal class if it already exist in the parent AsyncKafkaConsumer?

private final Optional<StreamsRebalanceListenerInvoker> streamsRebalanceListenerInvoker;

(can we just use that one? as we do with the rebalanceListenerInvoker)

Comment on lines 1960 to 1964
if (streamsRebalanceListenerInvoker.isPresent()) {
streamsRebalanceListenerInvoker.get().setRebalanceListener(streamsRebalanceListener);
} else {
throw new IllegalStateException("Consumer was not created to be used with Streams rebalance protocol events");
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect is probably best/clearer to do this setup/validation before any actual subscription action (subscribeInternal)?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


public Exception invokeTasksAssigned(final StreamsRebalanceData.Assignment assignment) {
if (listener.isPresent()) {
log.info("Adding newly assigned tasks: {}", assignment);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This log seems a bit confusing here on the callback invocations. Shouldn't it be more along the lines of "trigger task assigned callbacks" (maybe is me misreading, missing KS details. Ex. on the consumer side the assigned callback runs when the partitions were already assigned/added). If this applies, let's align also the logs for revoked/lost

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer rebalance listener logs Adding newly assigned partitions: in this case, I just copied its behavior for consistency. But generally, I agree.

I updated the log line indicate that the rebalance listener is invoked.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer rebalance listener logs Adding newly assigned partitions: in this case

I see, well, that's wrong too :) at that point the partitions have been already added really. I will fix it on the consumer side ;)


@Test
public void testCloseInvokesStreamsRebalanceListenerOnAllTasksLostWhenMemberEpochZeroOrNegative() {
// Test that close() calls streamsRebalanceListener.invokeAllTasksLost() when memberEpoch <= 0
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this comment seems to state the same as the func name really, needed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 2275 to 2276
assertNotNull(thrownException.getCause());
assertTrue(thrownException.getCause() instanceof RuntimeException);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assertNotNull(thrownException.getCause());
assertTrue(thrownException.getCause() instanceof RuntimeException);
assertInstanceOf(RuntimeException.class, thrownException.getCause())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Comment on lines 118 to 100
@Test
public void testInvokeAllTasksRevokedWithNoListener() {
// When no listener is set, should return null
Exception result = invoker.invokeAllTasksRevoked();
assertNull(result);
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this testing the same as testConstructorInitializesWithEmptyListener?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

}

@Test
public void testInvokeAllTasksLostWithNoListener() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

isn't this testing the same as testConstructorInitializesWithEmptyListener?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

Copy link
Contributor

@frankvicky frankvicky left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lucasbru: Thanks for the patch.


@Test
public void testSetRebalanceListener() {
// Test setting a listener
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need this comment?
I suppose the test case name is clear enough.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


@Test
public void testSetRebalanceListenerWithNull() {
// Test setting listener to null
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

@lucasbru
Copy link
Member Author

Wow, so many reviews. Thanks a lot. I addressed the comments @mjsax @frankvicky @lianetm @UladzislauBlok

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the updates @lucasbru !


public Exception invokeTasksAssigned(final StreamsRebalanceData.Assignment assignment) {
if (listener.isPresent()) {
log.info("Adding newly assigned tasks: {}", assignment);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The consumer rebalance listener logs Adding newly assigned partitions: in this case

I see, well, that's wrong too :) at that point the partitions have been already added really. I will fix it on the consumer side ;)

if (assignedPartitions.isEmpty())
// Nothing to revoke.
return;
if (streamsRebalanceListenerInvoker.isPresent()) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the streamsRebalanceListenerInvoker could be null here in the case where close is called from the constructor upon a failure building the consumer (ln 509), so maybe consider adding a check here? (we've been seeing those noisy NPE logs on close recently actually)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, that close is sneaky. Fixed.

applicationEventHandler.add(new CommitOnCloseEvent());
}

private void runRebalanceCallbacksOnClose() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this func is being extended to cover streams tasks now too, should we tweak the error message to make it more generic? (above, Failed to release group assignment). Up to you, but maybe something along the lines of "Failed running callbacks" would apply better to both cases

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

public void setRebalanceListener(StreamsRebalanceListener streamsRebalanceListener) {
Objects.requireNonNull(streamsRebalanceListener, "StreamsRebalanceListener cannot be null");
if (listener.isPresent() && listener.get() != streamsRebalanceListener) {
throw new IllegalStateException("StreamsRebalanceListener can only be set once");
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

given this validation, should we clear the listener on unsubscribe? (otherwise, I expect that subscribe+unsubscribe+subscribe would fail on the last subscribe with this error).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Streams will always use the same rebalance listener to subscribe

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's actually not true - I think I looked at the wrong code. So I changed it make to allow overwriting the rebalance listener on subscribe

Copy link
Member Author

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@lianetm Thanks! Addressed / Responded

applicationEventHandler.add(new CommitOnCloseEvent());
}

private void runRebalanceCallbacksOnClose() {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (assignedPartitions.isEmpty())
// Nothing to revoke.
return;
if (streamsRebalanceListenerInvoker.isPresent()) {
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, that close is sneaky. Fixed.

Copy link
Member

@lianetm lianetm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! LGTM

lucasbru and others added 6 commits September 15, 2025 21:48
In the consumer, we invoke the consumer rebalance onPartitionRevoked or onPartitionLost
callbacks, when the consumer closes. The point is that the application may want to
commit, or wipe the state if we are closing unsuccessfully.

In the StreamsRebalanceListener, we did not implement this behavior, which means when
closing the consumer we may lose some progress, and in the worst case also miss that
we have to wipe our local state state since we got fenced.

In this PR we implement StreamsRebalanceListenerInvoker, very similarly
to ConsumerRebalanceListenerInvoker and invoke it in Consumer.close.
@lucasbru lucasbru force-pushed the invoke_rebalance_callbacks branch from dc8de27 to 3ec8974 Compare September 15, 2025 19:48
@lucasbru lucasbru merged commit 2c34738 into apache:trunk Sep 16, 2025
22 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

5 participants