Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions clients/src/main/java/org/apache/kafka/clients/Metadata.java
Original file line number Diff line number Diff line change
Expand Up @@ -211,8 +211,15 @@ else if (expireMs <= now) {
for (Listener listener: listeners)
listener.onMetadataUpdate(cluster);

// Do this after notifying listeners as subscribed topics' list can be changed by listeners
this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster;

if (this.needMetadataForAllTopics) {
// the listener may change the interested topics, which could cause another metadata refresh.
// If we have already fetched all topics, however, another fetch should be unnecessary.
this.needUpdate = false;
this.cluster = getClusterForCurrentTopics(cluster);
} else {
this.cluster = cluster;
}

notifyAll();
log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster);
Expand Down Expand Up @@ -287,7 +294,7 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) {
Set<String> unauthorizedTopics = new HashSet<>();
Collection<PartitionInfo> partitionInfos = new ArrayList<>();
List<Node> nodes = Collections.emptyList();
Set<String> internalTopics = Collections.<String>emptySet();
Set<String> internalTopics = Collections.emptySet();
if (cluster != null) {
internalTopics = cluster.internalTopics();
unauthorizedTopics.addAll(cluster.unauthorizedTopics());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -801,7 +801,7 @@ public void subscribe(Collection<String> topics, ConsumerRebalanceListener liste
throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic");
}
log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", "));
this.subscriptions.subscribe(topics, listener);
this.subscriptions.subscribe(new HashSet<>(topics), listener);
metadata.setTopics(subscriptions.groupSubscription());
}
} finally {
Expand Down Expand Up @@ -914,7 +914,7 @@ public void assign(Collection<TopicPartition> partitions) {
}

log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", "));
this.subscriptions.assignFromUser(partitions);
this.subscriptions.assignFromUser(new HashSet<>(partitions));
metadata.setTopics(topics);
}
} finally {
Expand Down Expand Up @@ -1007,6 +1007,12 @@ private Map<TopicPartition, List<ConsumerRecord<K, V>>> pollOnce(long timeout) {

long now = time.milliseconds();
client.poll(Math.min(coordinator.timeToNextPoll(now), timeout), now);

// after the long poll, we should check whether the group needs to rebalance
// prior to returning data so that the group can stabilize faster
if (coordinator.needRejoin())
return Collections.emptyMap();

return fetcher.fetchedRecords();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,26 +94,26 @@ public void subscribe(Collection<String> topics) {
public void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) {
ensureNotClosed();
this.subscriptions.subscribe(pattern, listener);
List<String> topicsToSubscribe = new ArrayList<>();
Set<String> topicsToSubscribe = new HashSet<>();
for (String topic: partitions.keySet()) {
if (pattern.matcher(topic).matches() &&
!subscriptions.subscription().contains(topic))
topicsToSubscribe.add(topic);
}
ensureNotClosed();
this.subscriptions.changeSubscription(topicsToSubscribe);
this.subscriptions.subscribeFromPattern(topicsToSubscribe);
}

@Override
public void subscribe(Collection<String> topics, final ConsumerRebalanceListener listener) {
ensureNotClosed();
this.subscriptions.subscribe(topics, listener);
this.subscriptions.subscribe(new HashSet<>(topics), listener);
}

@Override
public void assign(Collection<TopicPartition> partitions) {
ensureNotClosed();
this.subscriptions.assignFromUser(partitions);
this.subscriptions.assignFromUser(new HashSet<>(partitions));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -270,16 +270,6 @@ public synchronized void ensureActiveGroup() {
// when sending heartbeats and does not necessarily require us to rejoin the group.
ensureCoordinatorReady();

if (!needRejoin())
return;

// call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
// time if the client is woken up before a pending rebalance completes.
if (needsJoinPrepare) {
onJoinPrepare(generation.generationId, generation.memberId);
needsJoinPrepare = false;
}

if (heartbeatThread == null) {
heartbeatThread = new HeartbeatThread();
heartbeatThread.start();
Expand All @@ -288,6 +278,16 @@ public synchronized void ensureActiveGroup() {
while (needRejoin()) {
ensureCoordinatorReady();

// call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second
// time if the client is woken up before a pending rebalance completes. This must be called
// on each iteration of the loop because an event requiring a rebalance (such as a metadata
// refresh which changes the matched subscription set) can occur while another rebalance is
// still in progress.
if (needsJoinPrepare) {
onJoinPrepare(generation.generationId, generation.memberId);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm a bit concerned about the overhead here since most implementation of onJoinPrepare may involve committing offsets etc which is latency sensitive, while most (if not all??) of such functions only need to be triggered once, since consumers will pause fetching during rebalance anyways.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

EDIT: nvm.

needsJoinPrepare = false;
}

// ensure that there are no pending requests to the coordinator. This is important
// in particular to avoid resending a pending JoinGroup request.
if (client.pendingRequestCount(this.coordinator) > 0) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator {
// of offset commit requests, which may be invoked from the heartbeat thread
private final ConcurrentLinkedQueue<OffsetCommitCompletion> completedOffsetCommits;

private boolean isLeader = false;
private Set<String> joinedSubscription;
private MetadataSnapshot metadataSnapshot;
private MetadataSnapshot assignmentSnapshot;
private long nextAutoCommitDeadline;
Expand Down Expand Up @@ -137,9 +139,10 @@ public String protocolType() {

@Override
public List<ProtocolMetadata> metadata() {
this.joinedSubscription = subscriptions.subscription();
List<ProtocolMetadata> metadataList = new ArrayList<>();
for (PartitionAssignor assignor : assignors) {
Subscription subscription = assignor.subscription(subscriptions.subscription());
Subscription subscription = assignor.subscription(joinedSubscription);
ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription);
metadataList.add(new ProtocolMetadata(assignor.name(), metadata));
}
Expand All @@ -155,26 +158,26 @@ public void onMetadataUpdate(Cluster cluster) {
throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics()));

if (subscriptions.hasPatternSubscription()) {
final List<String> topicsToSubscribe = new ArrayList<>();
final Set<String> topicsToSubscribe = new HashSet<>();

for (String topic : cluster.topics())
if (subscriptions.getSubscribedPattern().matcher(topic).matches() &&
!(excludeInternalTopics && cluster.internalTopics().contains(topic)))
topicsToSubscribe.add(topic);

subscriptions.changeSubscription(topicsToSubscribe);
subscriptions.subscribeFromPattern(topicsToSubscribe);

// note we still need to update the topics contained in the metadata. Although we have
// specified that all topics should be fetched, only those set explicitly will be retained
metadata.setTopics(subscriptions.groupSubscription());
}

// check if there are any changes to the metadata which should trigger a rebalance
if (subscriptions.partitionsAutoAssigned()) {
MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster);
if (!snapshot.equals(metadataSnapshot)) {
if (!snapshot.equals(metadataSnapshot))
metadataSnapshot = snapshot;
subscriptions.needReassignment();
}
}

}
});
}
Expand All @@ -192,12 +195,9 @@ protected void onJoinComplete(int generation,
String memberId,
String assignmentStrategy,
ByteBuffer assignmentBuffer) {
// if we were the assignor, then we need to make sure that there have been no metadata updates
// since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) {
subscriptions.needReassignment();
return;
}
// only the leader is responsible for monitoring for metadata changes (i.e. partition changes)
if (!isLeader)
assignmentSnapshot = null;

PartitionAssignor assignor = lookupAssignor(assignmentStrategy);
if (assignor == null)
Expand Down Expand Up @@ -246,13 +246,10 @@ public void poll(long now) {
now = time.milliseconds();
}

if (subscriptions.partitionsAutoAssigned() && needRejoin()) {
// due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that
// the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives
// while the rebalance is still pending (for example, when the join group is still inflight), then we will lose
// track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without
// ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a
// rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem.
if (needRejoin()) {
// due to a race condition between the initial metadata fetch and the initial rebalance,
// we need to ensure that the metadata is fresh before joining initially. This ensures
// that we have matched the pattern against the cluster's topics at least once before joining.
if (subscriptions.hasPatternSubscription())
client.ensureFreshMetadata();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we still need this now that the PR fixes the race condition?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Don't we still have a race condition with the initial topic metadata fetch and the regex subscription? Seems like we still need this to ensure that we see at least one topic metadata refresh prior to rebalancing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right. Since this PR fixes the intermittent issues we saw earlier with pattern subscription unit tests, do you think additional unit tests (not necessarily as part of this PR) are required to verify the behavior for this fix on initial metadata fetch race condition? If we remove this if block all unit tests still seem to pass.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@vahidhashemian I've been working on a unit test that I'll post shortly. It uncovered a couple minor problems which I've had to fix. Hooray for unit tests!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually the test I've been working on is specifically for the metadata refresh with a rebalance in progress. We might still need another test case to verify the metadata race above, but I guess I'll leave that for another patch.


Expand Down Expand Up @@ -303,6 +300,8 @@ protected Map<String, ByteBuffer> performAssignment(String leaderId,
// update metadata (if needed) and keep track of the metadata used for assignment so that
// we can check after rebalance completion whether anything has changed
client.ensureFreshMetadata();

isLeader = true;
assignmentSnapshot = metadataSnapshot;

log.debug("Performing assignment for group {} using strategy {} with subscriptions {}",
Expand Down Expand Up @@ -339,14 +338,24 @@ protected void onJoinPrepare(int generation, String memberId) {
listener.getClass().getName(), groupId, e);
}

assignmentSnapshot = null;
subscriptions.needReassignment();
isLeader = false;
subscriptions.resetGroupSubscription();
}

@Override
protected boolean needRejoin() {
return subscriptions.partitionsAutoAssigned() &&
(super.needRejoin() || subscriptions.partitionAssignmentNeeded());
public boolean needRejoin() {
if (!subscriptions.partitionsAutoAssigned())
return false;

// we need to rejoin if we performed the assignment and metadata has changed
if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot))
return true;

// we need to join if our subscription has changed since the last join
if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription()))
return true;

return super.needRejoin();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -350,26 +350,22 @@ private long listOffset(TopicPartition partition, long timestamp) {
* the defaultResetPolicy is NONE
*/
public Map<TopicPartition, List<ConsumerRecord<K, V>>> fetchedRecords() {
if (this.subscriptions.partitionAssignmentNeeded()) {
return Collections.emptyMap();
} else {
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
int recordsRemaining = maxPollRecords;
Map<TopicPartition, List<ConsumerRecord<K, V>>> drained = new HashMap<>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is it safe to remove this check now?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I had to move the check from here and into KafkaConsumer.pollOnce(). Sort of seems better that way anyway because we check additional causes of rebalance there.

int recordsRemaining = maxPollRecords;

while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
CompletedFetch completedFetch = completedFetches.poll();
if (completedFetch == null)
break;
while (recordsRemaining > 0) {
if (nextInLineRecords == null || nextInLineRecords.isEmpty()) {
CompletedFetch completedFetch = completedFetches.poll();
if (completedFetch == null)
break;

nextInLineRecords = parseFetchedData(completedFetch);
} else {
recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
}
nextInLineRecords = parseFetchedData(completedFetch);
} else {
recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining);
}

return drained;
}

return drained;
}

private int append(Map<TopicPartition, List<ConsumerRecord<K, V>>> drained,
Expand Down
Loading