Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -187,25 +187,6 @@ public class AsyncKafkaConsumer<K, V> implements ConsumerDelegate<K, V> {
*/
private class BackgroundEventProcessor implements EventProcessor<BackgroundEvent> {

private Optional<StreamsRebalanceListener> streamsRebalanceListener = Optional.empty();
private final Optional<StreamsRebalanceData> streamsRebalanceData;

public BackgroundEventProcessor() {
this.streamsRebalanceData = Optional.empty();
}

public BackgroundEventProcessor(final Optional<StreamsRebalanceData> streamsRebalanceData) {
this.streamsRebalanceData = streamsRebalanceData;
}

private void setStreamsRebalanceListener(final StreamsRebalanceListener streamsRebalanceListener) {
if (streamsRebalanceData.isEmpty()) {
throw new IllegalStateException("Background event processor was not created to be used with Streams " +
"rebalance protocol events");
}
this.streamsRebalanceListener = Optional.of(streamsRebalanceListener);
}

@Override
public void process(final BackgroundEvent event) {
switch (event.type()) {
Expand Down Expand Up @@ -278,44 +259,26 @@ private void processStreamsOnAllTasksLostCallbackNeededEvent(final StreamsOnAllT

private StreamsOnTasksRevokedCallbackCompletedEvent invokeOnTasksRevokedCallback(final Set<StreamsRebalanceData.TaskId> activeTasksToRevoke,
final CompletableFuture<Void> future) {
final Optional<Exception> exceptionFromCallback = streamsRebalanceListener().onTasksRevoked(activeTasksToRevoke);
final Optional<Exception> exceptionFromCallback = Optional.ofNullable(streamsRebalanceListenerInvoker().invokeTasksRevoked(activeTasksToRevoke));
final Optional<KafkaException> error = exceptionFromCallback.map(e -> ConsumerUtils.maybeWrapAsKafkaException(e, "Task revocation callback throws an error"));
return new StreamsOnTasksRevokedCallbackCompletedEvent(future, error);
}

private StreamsOnTasksAssignedCallbackCompletedEvent invokeOnTasksAssignedCallback(final StreamsRebalanceData.Assignment assignment,
final CompletableFuture<Void> future) {
final Optional<KafkaException> error;
final Optional<Exception> exceptionFromCallback = streamsRebalanceListener().onTasksAssigned(assignment);
if (exceptionFromCallback.isPresent()) {
error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "Task assignment callback throws an error"));
} else {
error = Optional.empty();
streamsRebalanceData().setReconciledAssignment(assignment);
}
final Optional<Exception> exceptionFromCallback = Optional.ofNullable(streamsRebalanceListenerInvoker().invokeTasksAssigned(assignment));
final Optional<KafkaException> error = exceptionFromCallback.map(e -> ConsumerUtils.maybeWrapAsKafkaException(e, "Task assignment callback throws an error"));
return new StreamsOnTasksAssignedCallbackCompletedEvent(future, error);
}

private StreamsOnAllTasksLostCallbackCompletedEvent invokeOnAllTasksLostCallback(final CompletableFuture<Void> future) {
final Optional<KafkaException> error;
final Optional<Exception> exceptionFromCallback = streamsRebalanceListener().onAllTasksLost();
if (exceptionFromCallback.isPresent()) {
error = Optional.of(ConsumerUtils.maybeWrapAsKafkaException(exceptionFromCallback.get(), "All tasks lost callback throws an error"));
} else {
error = Optional.empty();
streamsRebalanceData().setReconciledAssignment(StreamsRebalanceData.Assignment.EMPTY);
}
final Optional<Exception> exceptionFromCallback = Optional.ofNullable(streamsRebalanceListenerInvoker().invokeAllTasksLost());
final Optional<KafkaException> error = exceptionFromCallback.map(e -> ConsumerUtils.maybeWrapAsKafkaException(e, "All tasks lost callback throws an error"));
return new StreamsOnAllTasksLostCallbackCompletedEvent(future, error);
}

private StreamsRebalanceData streamsRebalanceData() {
return streamsRebalanceData.orElseThrow(
() -> new IllegalStateException("Background event processor was not created to be used with Streams " +
"rebalance protocol events"));
}

private StreamsRebalanceListener streamsRebalanceListener() {
return streamsRebalanceListener.orElseThrow(
private StreamsRebalanceListenerInvoker streamsRebalanceListenerInvoker() {
return streamsRebalanceListenerInvoker.orElseThrow(
() -> new IllegalStateException("Background event processor was not created to be used with Streams " +
"rebalance protocol events"));
}
Expand Down Expand Up @@ -367,6 +330,7 @@ private StreamsRebalanceListener streamsRebalanceListener() {
private final WakeupTrigger wakeupTrigger = new WakeupTrigger();
private final OffsetCommitCallbackInvoker offsetCommitCallbackInvoker;
private final ConsumerRebalanceListenerInvoker rebalanceListenerInvoker;
private final Optional<StreamsRebalanceListenerInvoker> streamsRebalanceListenerInvoker;
// Last triggered async commit future. Used to wait until all previous async commits are completed.
// We only need to keep track of the last one, since they are guaranteed to complete in order.
private CompletableFuture<Map<TopicPartition, OffsetAndMetadata>> lastPendingAsyncCommit = null;
Expand Down Expand Up @@ -517,7 +481,9 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
time,
new RebalanceCallbackMetricsManager(metrics)
);
this.backgroundEventProcessor = new BackgroundEventProcessor(streamsRebalanceData);
this.streamsRebalanceListenerInvoker = streamsRebalanceData.map(s ->
new StreamsRebalanceListenerInvoker(logContext, s));
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = backgroundEventReaperFactory.build(logContext);

// The FetchCollector is only used on the application thread.
Expand Down Expand Up @@ -577,6 +543,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
this.time = time;
this.backgroundEventQueue = backgroundEventQueue;
this.rebalanceListenerInvoker = rebalanceListenerInvoker;
this.streamsRebalanceListenerInvoker = Optional.empty();
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = backgroundEventReaper;
this.metrics = metrics;
Expand Down Expand Up @@ -699,6 +666,7 @@ public AsyncKafkaConsumer(final ConsumerConfig config,
networkClientDelegateSupplier,
requestManagersSupplier,
asyncConsumerMetrics);
this.streamsRebalanceListenerInvoker = Optional.empty();
this.backgroundEventProcessor = new BackgroundEventProcessor();
this.backgroundEventReaper = new CompletableEventReaper(logContext);
}
Expand Down Expand Up @@ -1477,7 +1445,7 @@ private void close(Duration timeout, CloseOptions.GroupMembershipOperation membe
() -> autoCommitOnClose(closeTimer), firstException);
swallow(log, Level.ERROR, "Failed to stop finding coordinator",
this::stopFindCoordinatorOnClose, firstException);
swallow(log, Level.ERROR, "Failed to release group assignment",
swallow(log, Level.ERROR, "Failed to run rebalance callbacks",
this::runRebalanceCallbacksOnClose, firstException);
swallow(log, Level.ERROR, "Failed to leave group while closing consumer",
() -> leaveGroupOnClose(closeTimer, membershipOperation), firstException);
Expand Down Expand Up @@ -1527,26 +1495,39 @@ private void autoCommitOnClose(final Timer timer) {
}

private void runRebalanceCallbacksOnClose() {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this func is being extended to cover streams tasks now too, should we tweak the error message to make it more generic? (above, Failed to release group assignment). Up to you, but maybe something along the lines of "Failed running callbacks" would apply better to both cases

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

if (groupMetadata.get().isEmpty() || rebalanceListenerInvoker == null)
if (groupMetadata.get().isEmpty())
return;

int memberEpoch = groupMetadata.get().get().generationId();

Set<TopicPartition> assignedPartitions = groupAssignmentSnapshot.get();
Exception error = null;

if (assignedPartitions.isEmpty())
// Nothing to revoke.
return;
if (streamsRebalanceListenerInvoker != null && streamsRebalanceListenerInvoker.isPresent()) {

if (memberEpoch > 0) {
error = streamsRebalanceListenerInvoker.get().invokeAllTasksRevoked();
} else {
error = streamsRebalanceListenerInvoker.get().invokeAllTasksLost();
}

SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
droppedPartitions.addAll(assignedPartitions);
} else if (rebalanceListenerInvoker != null) {

final Exception error;
Set<TopicPartition> assignedPartitions = groupAssignmentSnapshot.get();

if (memberEpoch > 0)
error = rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
else
error = rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
if (assignedPartitions.isEmpty())
// Nothing to revoke.
return;

SortedSet<TopicPartition> droppedPartitions = new TreeSet<>(TOPIC_PARTITION_COMPARATOR);
droppedPartitions.addAll(assignedPartitions);

if (memberEpoch > 0) {
error = rebalanceListenerInvoker.invokePartitionsRevoked(droppedPartitions);
} else {
error = rebalanceListenerInvoker.invokePartitionsLost(droppedPartitions);
}

}

if (error != null)
throw ConsumerUtils.maybeWrapAsKafkaException(error);
Expand Down Expand Up @@ -1963,8 +1944,12 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste
}

public void subscribe(Collection<String> topics, StreamsRebalanceListener streamsRebalanceListener) {

streamsRebalanceListenerInvoker
.orElseThrow(() -> new IllegalStateException("Consumer was not created to be used with Streams rebalance protocol events"))
.setRebalanceListener(streamsRebalanceListener);

subscribeInternal(topics, Optional.empty());
backgroundEventProcessor.setStreamsRebalanceListener(streamsRebalanceListener);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.kafka.clients.consumer.internals;

import org.apache.kafka.common.errors.InterruptException;
import org.apache.kafka.common.errors.WakeupException;
import org.apache.kafka.common.utils.LogContext;

import org.slf4j.Logger;

import java.util.Objects;
import java.util.Optional;
import java.util.Set;

/**
* This class encapsulates the invocation of the callback methods defined in the {@link StreamsRebalanceListener}
* interface. When streams group task assignment changes, these methods are invoked. This class wraps those
* callback calls with some logging and error handling.
*/
public class StreamsRebalanceListenerInvoker {

private final Logger log;

private final StreamsRebalanceData streamsRebalanceData;
private Optional<StreamsRebalanceListener> listener;

StreamsRebalanceListenerInvoker(LogContext logContext, StreamsRebalanceData streamsRebalanceData) {
this.log = logContext.logger(getClass());
this.listener = Optional.empty();
this.streamsRebalanceData = streamsRebalanceData;
}

public void setRebalanceListener(StreamsRebalanceListener streamsRebalanceListener) {
Objects.requireNonNull(streamsRebalanceListener, "StreamsRebalanceListener cannot be null");
this.listener = Optional.of(streamsRebalanceListener);
}

public Exception invokeAllTasksRevoked() {
if (listener.isEmpty()) {
throw new IllegalStateException("StreamsRebalanceListener is not defined");
}
return invokeTasksRevoked(streamsRebalanceData.reconciledAssignment().activeTasks());
}

public Exception invokeTasksAssigned(final StreamsRebalanceData.Assignment assignment) {
if (listener.isEmpty()) {
throw new IllegalStateException("StreamsRebalanceListener is not defined");
}
log.info("Invoking tasks assigned callback for new assignment: {}", assignment);
try {
listener.get().onTasksAssigned(assignment);
} catch (WakeupException | InterruptException e) {
throw e;
} catch (Exception e) {
log.error(
"Streams rebalance listener failed on invocation of onTasksAssigned for tasks {}",
assignment,
e
);
return e;
}
return null;
}

public Exception invokeTasksRevoked(final Set<StreamsRebalanceData.TaskId> tasks) {
if (listener.isEmpty()) {
throw new IllegalStateException("StreamsRebalanceListener is not defined");
}
log.info("Invoking task revoked callback for revoked active tasks {}", tasks);
try {
listener.get().onTasksRevoked(tasks);
} catch (WakeupException | InterruptException e) {
throw e;
} catch (Exception e) {
log.error(
"Streams rebalance listener failed on invocation of onTasksRevoked for tasks {}",
tasks,
e
);
return e;
}
return null;
}

public Exception invokeAllTasksLost() {
if (listener.isEmpty()) {
throw new IllegalStateException("StreamsRebalanceListener is not defined");
}
log.info("Invoking tasks lost callback for all tasks");
try {
listener.get().onAllTasksLost();
} catch (WakeupException | InterruptException e) {
throw e;
} catch (Exception e) {
log.error(
"Streams rebalance listener failed on invocation of onTasksLost.",
e
);
return e;
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2210,6 +2210,73 @@ private void markOffsetsReadyForCommitEvent() {
}).when(applicationEventHandler).add(ArgumentMatchers.isA(CommitEvent.class));
}

@Test
public void testCloseInvokesStreamsRebalanceListenerOnTasksRevokedWhenMemberEpochPositive() {
final String groupId = "streamsGroup";
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());

try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
consumer = newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), streamsRebalanceData);
StreamsRebalanceListener mockStreamsListener = mock(StreamsRebalanceListener.class);
when(mockStreamsListener.onTasksRevoked(any())).thenReturn(Optional.empty());
consumer.subscribe(singletonList("topic"), mockStreamsListener);
final MemberStateListener groupMetadataUpdateListener = captureGroupMetadataUpdateListener(requestManagers);
final int memberEpoch = 42;
final String memberId = "memberId";
groupMetadataUpdateListener.onMemberEpochUpdated(Optional.of(memberEpoch), memberId);

consumer.close(CloseOptions.timeout(Duration.ZERO));

verify(mockStreamsListener).onTasksRevoked(any());
}
}

@Test
public void testCloseInvokesStreamsRebalanceListenerOnAllTasksLostWhenMemberEpochZeroOrNegative() {
final String groupId = "streamsGroup";
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());

try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
consumer = newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), streamsRebalanceData);
StreamsRebalanceListener mockStreamsListener = mock(StreamsRebalanceListener.class);
when(mockStreamsListener.onAllTasksLost()).thenReturn(Optional.empty());
consumer.subscribe(singletonList("topic"), mockStreamsListener);
final MemberStateListener groupMetadataUpdateListener = captureGroupMetadataUpdateListener(requestManagers);
final int memberEpoch = 0;
final String memberId = "memberId";
groupMetadataUpdateListener.onMemberEpochUpdated(Optional.of(memberEpoch), memberId);

consumer.close(CloseOptions.timeout(Duration.ZERO));

verify(mockStreamsListener).onAllTasksLost();
}
}

@Test
public void testCloseWrapsStreamsRebalanceListenerException() {
final String groupId = "streamsGroup";
final StreamsRebalanceData streamsRebalanceData = new StreamsRebalanceData(UUID.randomUUID(), Optional.empty(), Map.of(), Map.of());

try (final MockedStatic<RequestManagers> requestManagers = mockStatic(RequestManagers.class)) {
consumer = newConsumerWithStreamRebalanceData(requiredConsumerConfigAndGroupId(groupId), streamsRebalanceData);
StreamsRebalanceListener mockStreamsListener = mock(StreamsRebalanceListener.class);
RuntimeException testException = new RuntimeException("Test streams listener exception");
doThrow(testException).when(mockStreamsListener).onTasksRevoked(any());
consumer.subscribe(singletonList("topic"), mockStreamsListener);
final MemberStateListener groupMetadataUpdateListener = captureGroupMetadataUpdateListener(requestManagers);
final int memberEpoch = 1;
final String memberId = "memberId";
groupMetadataUpdateListener.onMemberEpochUpdated(Optional.of(memberEpoch), memberId);

KafkaException thrownException = assertThrows(KafkaException.class,
() -> consumer.close(CloseOptions.timeout(Duration.ZERO)));

assertInstanceOf(RuntimeException.class, thrownException.getCause());
assertTrue(thrownException.getCause().getMessage().contains("Test streams listener exception"));
verify(mockStreamsListener).onTasksRevoked(any());
}
}

private void markReconcileAndAutoCommitCompleteForPollEvent() {
doAnswer(invocation -> {
PollEvent event = invocation.getArgument(0);
Expand Down
Loading