diff --git a/clients/src/main/java/org/apache/kafka/clients/Metadata.java b/clients/src/main/java/org/apache/kafka/clients/Metadata.java index 0fd5d63da54d5..a4cf730b99d51 100644 --- a/clients/src/main/java/org/apache/kafka/clients/Metadata.java +++ b/clients/src/main/java/org/apache/kafka/clients/Metadata.java @@ -211,8 +211,15 @@ else if (expireMs <= now) { for (Listener listener: listeners) listener.onMetadataUpdate(cluster); - // Do this after notifying listeners as subscribed topics' list can be changed by listeners - this.cluster = this.needMetadataForAllTopics ? getClusterForCurrentTopics(cluster) : cluster; + + if (this.needMetadataForAllTopics) { + // the listener may change the interested topics, which could cause another metadata refresh. + // If we have already fetched all topics, however, another fetch should be unnecessary. + this.needUpdate = false; + this.cluster = getClusterForCurrentTopics(cluster); + } else { + this.cluster = cluster; + } notifyAll(); log.debug("Updated cluster metadata version {} to {}", this.version, this.cluster); @@ -287,7 +294,7 @@ private Cluster getClusterForCurrentTopics(Cluster cluster) { Set unauthorizedTopics = new HashSet<>(); Collection partitionInfos = new ArrayList<>(); List nodes = Collections.emptyList(); - Set internalTopics = Collections.emptySet(); + Set internalTopics = Collections.emptySet(); if (cluster != null) { internalTopics = cluster.internalTopics(); unauthorizedTopics.addAll(cluster.unauthorizedTopics()); diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index ef913027361be..85d519418854a 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -801,7 +801,7 @@ public void subscribe(Collection topics, ConsumerRebalanceListener liste throw new IllegalArgumentException("Topic collection to subscribe to cannot contain null or empty topic"); } log.debug("Subscribed to topic(s): {}", Utils.join(topics, ", ")); - this.subscriptions.subscribe(topics, listener); + this.subscriptions.subscribe(new HashSet<>(topics), listener); metadata.setTopics(subscriptions.groupSubscription()); } } finally { @@ -914,7 +914,7 @@ public void assign(Collection partitions) { } log.debug("Subscribed to partition(s): {}", Utils.join(partitions, ", ")); - this.subscriptions.assignFromUser(partitions); + this.subscriptions.assignFromUser(new HashSet<>(partitions)); metadata.setTopics(topics); } } finally { @@ -1007,6 +1007,12 @@ private Map>> pollOnce(long timeout) { long now = time.milliseconds(); client.poll(Math.min(coordinator.timeToNextPoll(now), timeout), now); + + // after the long poll, we should check whether the group needs to rebalance + // prior to returning data so that the group can stabilize faster + if (coordinator.needRejoin()) + return Collections.emptyMap(); + return fetcher.fetchedRecords(); } diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java index 9ab4c29493da2..62eb77dd6e727 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/MockConsumer.java @@ -94,26 +94,26 @@ public void subscribe(Collection topics) { public void subscribe(Pattern pattern, final ConsumerRebalanceListener listener) { ensureNotClosed(); this.subscriptions.subscribe(pattern, listener); - List topicsToSubscribe = new ArrayList<>(); + Set topicsToSubscribe = new HashSet<>(); for (String topic: partitions.keySet()) { if (pattern.matcher(topic).matches() && !subscriptions.subscription().contains(topic)) topicsToSubscribe.add(topic); } ensureNotClosed(); - this.subscriptions.changeSubscription(topicsToSubscribe); + this.subscriptions.subscribeFromPattern(topicsToSubscribe); } @Override public void subscribe(Collection topics, final ConsumerRebalanceListener listener) { ensureNotClosed(); - this.subscriptions.subscribe(topics, listener); + this.subscriptions.subscribe(new HashSet<>(topics), listener); } @Override public void assign(Collection partitions) { ensureNotClosed(); - this.subscriptions.assignFromUser(partitions); + this.subscriptions.assignFromUser(new HashSet<>(partitions)); } @Override diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java index 690df2600d557..bf6b920fdae4f 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java @@ -270,16 +270,6 @@ public synchronized void ensureActiveGroup() { // when sending heartbeats and does not necessarily require us to rejoin the group. ensureCoordinatorReady(); - if (!needRejoin()) - return; - - // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second - // time if the client is woken up before a pending rebalance completes. - if (needsJoinPrepare) { - onJoinPrepare(generation.generationId, generation.memberId); - needsJoinPrepare = false; - } - if (heartbeatThread == null) { heartbeatThread = new HeartbeatThread(); heartbeatThread.start(); @@ -288,6 +278,16 @@ public synchronized void ensureActiveGroup() { while (needRejoin()) { ensureCoordinatorReady(); + // call onJoinPrepare if needed. We set a flag to make sure that we do not call it a second + // time if the client is woken up before a pending rebalance completes. This must be called + // on each iteration of the loop because an event requiring a rebalance (such as a metadata + // refresh which changes the matched subscription set) can occur while another rebalance is + // still in progress. + if (needsJoinPrepare) { + onJoinPrepare(generation.generationId, generation.memberId); + needsJoinPrepare = false; + } + // ensure that there are no pending requests to the coordinator. This is important // in particular to avoid resending a pending JoinGroup request. if (client.pendingRequestCount(this.coordinator) > 0) { diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java index 5fee45afe8319..b8df50e9070dc 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinator.java @@ -78,6 +78,8 @@ public final class ConsumerCoordinator extends AbstractCoordinator { // of offset commit requests, which may be invoked from the heartbeat thread private final ConcurrentLinkedQueue completedOffsetCommits; + private boolean isLeader = false; + private Set joinedSubscription; private MetadataSnapshot metadataSnapshot; private MetadataSnapshot assignmentSnapshot; private long nextAutoCommitDeadline; @@ -137,9 +139,10 @@ public String protocolType() { @Override public List metadata() { + this.joinedSubscription = subscriptions.subscription(); List metadataList = new ArrayList<>(); for (PartitionAssignor assignor : assignors) { - Subscription subscription = assignor.subscription(subscriptions.subscription()); + Subscription subscription = assignor.subscription(joinedSubscription); ByteBuffer metadata = ConsumerProtocol.serializeSubscription(subscription); metadataList.add(new ProtocolMetadata(assignor.name(), metadata)); } @@ -155,26 +158,26 @@ public void onMetadataUpdate(Cluster cluster) { throw new TopicAuthorizationException(new HashSet<>(cluster.unauthorizedTopics())); if (subscriptions.hasPatternSubscription()) { - final List topicsToSubscribe = new ArrayList<>(); + final Set topicsToSubscribe = new HashSet<>(); for (String topic : cluster.topics()) if (subscriptions.getSubscribedPattern().matcher(topic).matches() && !(excludeInternalTopics && cluster.internalTopics().contains(topic))) topicsToSubscribe.add(topic); - subscriptions.changeSubscription(topicsToSubscribe); + subscriptions.subscribeFromPattern(topicsToSubscribe); + + // note we still need to update the topics contained in the metadata. Although we have + // specified that all topics should be fetched, only those set explicitly will be retained metadata.setTopics(subscriptions.groupSubscription()); } // check if there are any changes to the metadata which should trigger a rebalance if (subscriptions.partitionsAutoAssigned()) { MetadataSnapshot snapshot = new MetadataSnapshot(subscriptions, cluster); - if (!snapshot.equals(metadataSnapshot)) { + if (!snapshot.equals(metadataSnapshot)) metadataSnapshot = snapshot; - subscriptions.needReassignment(); - } } - } }); } @@ -192,12 +195,9 @@ protected void onJoinComplete(int generation, String memberId, String assignmentStrategy, ByteBuffer assignmentBuffer) { - // if we were the assignor, then we need to make sure that there have been no metadata updates - // since the rebalance begin. Otherwise, we won't rebalance again until the next metadata change - if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) { - subscriptions.needReassignment(); - return; - } + // only the leader is responsible for monitoring for metadata changes (i.e. partition changes) + if (!isLeader) + assignmentSnapshot = null; PartitionAssignor assignor = lookupAssignor(assignmentStrategy); if (assignor == null) @@ -246,13 +246,10 @@ public void poll(long now) { now = time.milliseconds(); } - if (subscriptions.partitionsAutoAssigned() && needRejoin()) { - // due to a race condition between the initial metadata fetch and the initial rebalance, we need to ensure that - // the metadata is fresh before joining initially, and then request the metadata update. If metadata update arrives - // while the rebalance is still pending (for example, when the join group is still inflight), then we will lose - // track of the fact that we need to rebalance again to reflect the change to the topic subscription. Without - // ensuring that the metadata is fresh, any metadata update that changes the topic subscriptions and arrives with a - // rebalance in progress will essentially be ignored. See KAFKA-3949 for the complete description of the problem. + if (needRejoin()) { + // due to a race condition between the initial metadata fetch and the initial rebalance, + // we need to ensure that the metadata is fresh before joining initially. This ensures + // that we have matched the pattern against the cluster's topics at least once before joining. if (subscriptions.hasPatternSubscription()) client.ensureFreshMetadata(); @@ -303,6 +300,8 @@ protected Map performAssignment(String leaderId, // update metadata (if needed) and keep track of the metadata used for assignment so that // we can check after rebalance completion whether anything has changed client.ensureFreshMetadata(); + + isLeader = true; assignmentSnapshot = metadataSnapshot; log.debug("Performing assignment for group {} using strategy {} with subscriptions {}", @@ -339,14 +338,24 @@ protected void onJoinPrepare(int generation, String memberId) { listener.getClass().getName(), groupId, e); } - assignmentSnapshot = null; - subscriptions.needReassignment(); + isLeader = false; + subscriptions.resetGroupSubscription(); } @Override - protected boolean needRejoin() { - return subscriptions.partitionsAutoAssigned() && - (super.needRejoin() || subscriptions.partitionAssignmentNeeded()); + public boolean needRejoin() { + if (!subscriptions.partitionsAutoAssigned()) + return false; + + // we need to rejoin if we performed the assignment and metadata has changed + if (assignmentSnapshot != null && !assignmentSnapshot.equals(metadataSnapshot)) + return true; + + // we need to join if our subscription has changed since the last join + if (joinedSubscription != null && !joinedSubscription.equals(subscriptions.subscription())) + return true; + + return super.needRejoin(); } /** diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java index 84278c6cd5572..aa5cdbeb6875b 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java @@ -350,26 +350,22 @@ private long listOffset(TopicPartition partition, long timestamp) { * the defaultResetPolicy is NONE */ public Map>> fetchedRecords() { - if (this.subscriptions.partitionAssignmentNeeded()) { - return Collections.emptyMap(); - } else { - Map>> drained = new HashMap<>(); - int recordsRemaining = maxPollRecords; + Map>> drained = new HashMap<>(); + int recordsRemaining = maxPollRecords; - while (recordsRemaining > 0) { - if (nextInLineRecords == null || nextInLineRecords.isEmpty()) { - CompletedFetch completedFetch = completedFetches.poll(); - if (completedFetch == null) - break; + while (recordsRemaining > 0) { + if (nextInLineRecords == null || nextInLineRecords.isEmpty()) { + CompletedFetch completedFetch = completedFetches.poll(); + if (completedFetch == null) + break; - nextInLineRecords = parseFetchedData(completedFetch); - } else { - recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining); - } + nextInLineRecords = parseFetchedData(completedFetch); + } else { + recordsRemaining -= append(drained, nextInLineRecords, recordsRemaining); } - - return drained; } + + return drained; } private int append(Map>> drained, diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java index e9b2eb24ba3e6..6d4c01b59a6ad 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/SubscriptionState.java @@ -18,6 +18,7 @@ import org.apache.kafka.common.TopicPartition; import java.util.Collection; +import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -46,6 +47,8 @@ * to set the initial fetch position (e.g. {@link Fetcher#resetOffset(TopicPartition)}. */ public class SubscriptionState { + private static final String SUBSCRIPTION_EXCEPTION_MESSAGE = + "Subscription to topics, partitions and pattern are mutually exclusive"; private enum SubscriptionType { NONE, AUTO_TOPICS, AUTO_PATTERN, USER_ASSIGNED @@ -58,20 +61,17 @@ private enum SubscriptionType { private Pattern subscribedPattern; /* the list of topics the user has requested */ - private final Set subscription; + private Set subscription; + + /* the list of partitions the user has requested */ + private Set userAssignment; /* the list of topics the group has subscribed to (set only for the leader on join group completion) */ private final Set groupSubscription; - /* the list of partitions the user has requested */ - private final Set userAssignment; - /* the list of partitions currently assigned */ private final Map assignment; - /* do we need to request a partition assignment from the coordinator? */ - private boolean needsPartitionAssignment; - /* do we need to request the latest committed offsets from the coordinator? */ private boolean needsFetchCommittedOffsets; @@ -81,8 +81,16 @@ private enum SubscriptionType { /* Listener to be invoked when assignment changes */ private ConsumerRebalanceListener listener; - private static final String SUBSCRIPTION_EXCEPTION_MESSAGE = - "Subscription to topics, partitions and pattern are mutually exclusive"; + public SubscriptionState(OffsetResetStrategy defaultResetStrategy) { + this.defaultResetStrategy = defaultResetStrategy; + this.subscription = Collections.emptySet(); + this.userAssignment = Collections.emptySet(); + this.assignment = new HashMap<>(); + this.groupSubscription = new HashSet<>(); + this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up + this.subscribedPattern = null; + this.subscriptionType = SubscriptionType.NONE; + } /** * This method sets the subscription type if it is not already set (i.e. when it is NONE), @@ -97,19 +105,7 @@ else if (this.subscriptionType != type) throw new IllegalStateException(SUBSCRIPTION_EXCEPTION_MESSAGE); } - public SubscriptionState(OffsetResetStrategy defaultResetStrategy) { - this.defaultResetStrategy = defaultResetStrategy; - this.subscription = new HashSet<>(); - this.userAssignment = new HashSet<>(); - this.assignment = new HashMap<>(); - this.groupSubscription = new HashSet<>(); - this.needsPartitionAssignment = false; - this.needsFetchCommittedOffsets = true; // initialize to true for the consumers to fetch offset upon starting up - this.subscribedPattern = null; - this.subscriptionType = SubscriptionType.NONE; - } - - public void subscribe(Collection topics, ConsumerRebalanceListener listener) { + public void subscribe(Set topics, ConsumerRebalanceListener listener) { if (listener == null) throw new IllegalArgumentException("RebalanceListener cannot be null"); @@ -120,12 +116,18 @@ public void subscribe(Collection topics, ConsumerRebalanceListener liste changeSubscription(topics); } - public void changeSubscription(Collection topicsToSubscribe) { - if (!this.subscription.equals(new HashSet<>(topicsToSubscribe))) { - this.subscription.clear(); - this.subscription.addAll(topicsToSubscribe); + public void subscribeFromPattern(Set topics) { + if (subscriptionType != SubscriptionType.AUTO_PATTERN) + throw new IllegalArgumentException("Attempt to subscribe from pattern while subscription type set to " + + subscriptionType); + + changeSubscription(topics); + } + + private void changeSubscription(Set topicsToSubscribe) { + if (!this.subscription.equals(topicsToSubscribe)) { + this.subscription = topicsToSubscribe; this.groupSubscription.addAll(topicsToSubscribe); - this.needsPartitionAssignment = true; // Remove any assigned partitions which are no longer subscribed to for (Iterator it = assignment.keySet().iterator(); it.hasNext(); ) { @@ -147,9 +149,11 @@ public void groupSubscribe(Collection topics) { this.groupSubscription.addAll(topics); } - public void needReassignment() { + /** + * Reset the group's subscription to only contain topics subscribed by this consumer. + */ + public void resetGroupSubscription() { this.groupSubscription.retainAll(subscription); - this.needsPartitionAssignment = true; } /** @@ -157,34 +161,37 @@ public void needReassignment() { * note this is different from {@link #assignFromSubscribed(Collection)} * whose input partitions are provided from the subscribed topics. */ - public void assignFromUser(Collection partitions) { + public void assignFromUser(Set partitions) { setSubscriptionType(SubscriptionType.USER_ASSIGNED); - this.userAssignment.clear(); - this.userAssignment.addAll(partitions); + if (!this.assignment.keySet().equals(partitions)) { + this.userAssignment = partitions; - for (TopicPartition partition : partitions) - if (!assignment.containsKey(partition)) - addAssignedPartition(partition); - - this.assignment.keySet().retainAll(this.userAssignment); - - this.needsPartitionAssignment = false; - this.needsFetchCommittedOffsets = true; + for (TopicPartition partition : partitions) + if (!assignment.containsKey(partition)) + addAssignedPartition(partition); + this.assignment.keySet().retainAll(this.userAssignment); + this.needsFetchCommittedOffsets = true; + } } /** * Change the assignment to the specified partitions returned from the coordinator, - * note this is different from {@link #assignFromUser(Collection)} which directly set the assignment from user inputs + * note this is different from {@link #assignFromUser(Set)} which directly set the assignment from user inputs */ public void assignFromSubscribed(Collection assignments) { + if (!this.partitionsAutoAssigned()) + throw new IllegalArgumentException("Attempt to dynamically assign partitions while manual assignment in use"); + for (TopicPartition tp : assignments) if (!this.subscription.contains(tp.topic())) throw new IllegalArgumentException("Assigned partition " + tp + " for non-subscribed topic."); + + // after rebalancing, we always reinitialize the assignment state this.assignment.clear(); for (TopicPartition tp: assignments) addAssignedPartition(tp); - this.needsPartitionAssignment = false; + this.needsFetchCommittedOffsets = true; } public void subscribe(Pattern pattern, ConsumerRebalanceListener listener) { @@ -202,10 +209,9 @@ public boolean hasPatternSubscription() { } public void unsubscribe() { - this.subscription.clear(); - this.userAssignment.clear(); + this.subscription = Collections.emptySet(); + this.userAssignment = Collections.emptySet(); this.assignment.clear(); - this.needsPartitionAssignment = true; this.subscribedPattern = null; this.subscriptionType = SubscriptionType.NONE; } @@ -346,10 +352,6 @@ public Set missingFetchPositions() { return missing; } - public boolean partitionAssignmentNeeded() { - return this.needsPartitionAssignment; - } - public boolean isAssigned(TopicPartition tp) { return assignment.containsKey(tp); } diff --git a/clients/src/test/java/org/apache/kafka/clients/MockClient.java b/clients/src/test/java/org/apache/kafka/clients/MockClient.java index 9fbbb88c48c57..8881f829b3ceb 100644 --- a/clients/src/test/java/org/apache/kafka/clients/MockClient.java +++ b/clients/src/test/java/org/apache/kafka/clients/MockClient.java @@ -147,7 +147,7 @@ public void send(ClientRequest request, long now) { @Override public List poll(long timeoutMs, long now) { - List copy = new ArrayList(this.responses); + List copy = new ArrayList<>(this.responses); while (!this.responses.isEmpty()) { ClientResponse response = this.responses.poll(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java index 8ec8b7578b9ca..0486e6c0c4e2f 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/ConsumerCoordinatorTest.java @@ -66,9 +66,13 @@ import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import java.util.regex.Pattern; +import static java.util.Collections.singleton; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -148,7 +152,7 @@ public void testGroupDescribeUnauthorized() { @Test(expected = GroupAuthorizationException.class) public void testGroupReadUnauthorized() { - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -206,7 +210,7 @@ public void testIllegalGeneration() { coordinator.ensureCoordinatorReady(); // illegal_generation will cause re-partition - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.subscribe(singleton(topicName), rebalanceListener); subscriptions.assignFromSubscribed(Collections.singletonList(tp)); time.sleep(sessionTimeoutMs); @@ -230,7 +234,7 @@ public void testUnknownConsumerId() { coordinator.ensureCoordinatorReady(); // illegal_generation will cause re-partition - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.subscribe(singleton(topicName), rebalanceListener); subscriptions.assignFromSubscribed(Collections.singletonList(tp)); time.sleep(sessionTimeoutMs); @@ -273,8 +277,7 @@ public void testCoordinatorDisconnect() { public void testJoinGroupInvalidGroupId() { final String consumerId = "leader"; - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); // ensure metadata is up-to-date for leader metadata.setTopics(Arrays.asList(topicName)); @@ -292,8 +295,7 @@ public void testJoinGroupInvalidGroupId() { public void testNormalJoinGroupLeader() { final String consumerId = "leader"; - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); // ensure metadata is up-to-date for leader metadata.setTopics(Arrays.asList(topicName)); @@ -304,7 +306,7 @@ public void testNormalJoinGroupLeader() { // normal join group Map> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName)); - partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp))); + partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(tp))); client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code())); client.prepareResponse(new MockClient.RequestMatcher() { @@ -315,23 +317,86 @@ public boolean matches(ClientRequest request) { sync.generationId() == 1 && sync.groupAssignment().containsKey(consumerId); } - }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + }, syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); - assertFalse(subscriptions.partitionAssignmentNeeded()); - assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); + assertFalse(coordinator.needRejoin()); + assertEquals(singleton(tp), subscriptions.assignedPartitions()); assertEquals(1, rebalanceListener.revokedCount); assertEquals(Collections.emptySet(), rebalanceListener.revoked); assertEquals(1, rebalanceListener.assignedCount); - assertEquals(Collections.singleton(tp), rebalanceListener.assigned); + assertEquals(singleton(tp), rebalanceListener.assigned); + } + + @Test + public void testMetadataRefreshDuringRebalance() { + final String consumerId = "leader"; + final String otherTopicName = "otherTopic"; + TopicPartition otherPartition = new TopicPartition(otherTopicName, 0); + + subscriptions.subscribe(Pattern.compile(".*"), rebalanceListener); + metadata.needMetadataForAllTopics(true); + metadata.update(cluster, time.milliseconds()); + + assertEquals(singleton(topicName), subscriptions.subscription()); + + client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); + coordinator.ensureCoordinatorReady(); + + Map> initialSubscription = singletonMap(consumerId, Arrays.asList(topicName)); + partitionAssignor.prepare(singletonMap(consumerId, singletonList(tp))); + + // the metadata will be updated in flight with a new topic added + final List updatedSubscription = Arrays.asList(topicName, otherTopicName); + final Set updatedSubscriptionSet = new HashSet<>(updatedSubscription); + + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, initialSubscription, Errors.NONE.code())); + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + final Map updatedPartitions = new HashMap<>(); + for (String topic : updatedSubscription) + updatedPartitions.put(topic, 1); + metadata.update(TestUtils.clusterWith(1, updatedPartitions), time.milliseconds()); + return true; + } + }, syncGroupResponse(singletonList(tp), Errors.NONE.code())); + + List newAssignment = Arrays.asList(tp, otherPartition); + Set newAssignmentSet = new HashSet<>(newAssignment); + + Map> updatedSubscriptions = singletonMap(consumerId, Arrays.asList(topicName, otherTopicName)); + partitionAssignor.prepare(singletonMap(consumerId, newAssignment)); + + // we expect to see a second rebalance with the new-found topics + client.prepareResponse(new MockClient.RequestMatcher() { + @Override + public boolean matches(ClientRequest request) { + JoinGroupRequest join = new JoinGroupRequest(request.request().body()); + ProtocolMetadata protocolMetadata = join.groupProtocols().iterator().next(); + PartitionAssignor.Subscription subscription = ConsumerProtocol.deserializeSubscription(protocolMetadata.metadata()); + protocolMetadata.metadata().rewind(); + return subscription.topics().containsAll(updatedSubscriptionSet); + } + }, joinGroupLeaderResponse(2, consumerId, updatedSubscriptions, Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(newAssignment, Errors.NONE.code())); + + coordinator.poll(time.milliseconds()); + + assertFalse(coordinator.needRejoin()); + assertEquals(updatedSubscriptionSet, subscriptions.subscription()); + assertEquals(newAssignmentSet, subscriptions.assignedPartitions()); + assertEquals(2, rebalanceListener.revokedCount); + assertEquals(singleton(tp), rebalanceListener.revoked); + assertEquals(2, rebalanceListener.assignedCount); + assertEquals(newAssignmentSet, rebalanceListener.assigned); } @Test public void testWakeupDuringJoin() { final String consumerId = "leader"; - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); // ensure metadata is up-to-date for leader metadata.setTopics(Arrays.asList(topicName)); @@ -341,7 +406,7 @@ public void testWakeupDuringJoin() { coordinator.ensureCoordinatorReady(); Map> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName)); - partitionAssignor.prepare(Collections.singletonMap(consumerId, Arrays.asList(tp))); + partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(tp))); // prepare only the first half of the join and then trigger the wakeup client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code())); @@ -354,23 +419,22 @@ public void testWakeupDuringJoin() { } // now complete the second half - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); - assertFalse(subscriptions.partitionAssignmentNeeded()); - assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); + assertFalse(coordinator.needRejoin()); + assertEquals(singleton(tp), subscriptions.assignedPartitions()); assertEquals(1, rebalanceListener.revokedCount); assertEquals(Collections.emptySet(), rebalanceListener.revoked); assertEquals(1, rebalanceListener.assignedCount); - assertEquals(Collections.singleton(tp), rebalanceListener.assigned); + assertEquals(singleton(tp), rebalanceListener.assigned); } @Test public void testNormalJoinGroupFollower() { final String consumerId = "consumer"; - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -385,29 +449,28 @@ public boolean matches(ClientRequest request) { sync.generationId() == 1 && sync.groupAssignment().isEmpty(); } - }, syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + }, syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); - assertFalse(subscriptions.partitionAssignmentNeeded()); - assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); + assertFalse(coordinator.needRejoin()); + assertEquals(singleton(tp), subscriptions.assignedPartitions()); assertEquals(1, rebalanceListener.revokedCount); assertEquals(1, rebalanceListener.assignedCount); - assertEquals(Collections.singleton(tp), rebalanceListener.assigned); + assertEquals(singleton(tp), rebalanceListener.assigned); } @Test public void testLeaveGroupOnClose() { final String consumerId = "consumer"; - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); final AtomicBoolean received = new AtomicBoolean(false); @@ -428,14 +491,13 @@ public boolean matches(ClientRequest request) { public void testMaybeLeaveGroup() { final String consumerId = "consumer"; - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); final AtomicBoolean received = new AtomicBoolean(false); @@ -459,8 +521,7 @@ public boolean matches(ClientRequest request) { public void testUnexpectedErrorOnSyncGroup() { final String consumerId = "consumer"; - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -475,8 +536,7 @@ public void testUnexpectedErrorOnSyncGroup() { public void testUnknownMemberIdOnSyncGroup() { final String consumerId = "consumer"; - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -493,20 +553,19 @@ public boolean matches(ClientRequest request) { return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID); } }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); - assertFalse(subscriptions.partitionAssignmentNeeded()); - assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); + assertFalse(coordinator.needRejoin()); + assertEquals(singleton(tp), subscriptions.assignedPartitions()); } @Test public void testRebalanceInProgressOnSyncGroup() { final String consumerId = "consumer"; - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -517,20 +576,19 @@ public void testRebalanceInProgressOnSyncGroup() { // then let the full join/sync finish successfully client.prepareResponse(joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); - assertFalse(subscriptions.partitionAssignmentNeeded()); - assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); + assertFalse(coordinator.needRejoin()); + assertEquals(singleton(tp), subscriptions.assignedPartitions()); } @Test public void testIllegalGenerationOnSyncGroup() { final String consumerId = "consumer"; - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -547,39 +605,45 @@ public boolean matches(ClientRequest request) { return joinRequest.memberId().equals(JoinGroupRequest.UNKNOWN_MEMBER_ID); } }, joinGroupFollowerResponse(2, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); - assertFalse(subscriptions.partitionAssignmentNeeded()); - assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); + assertFalse(coordinator.needRejoin()); + assertEquals(singleton(tp), subscriptions.assignedPartitions()); } @Test public void testMetadataChangeTriggersRebalance() { final String consumerId = "consumer"; - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + // ensure metadata is up-to-date for leader + metadata.setTopics(Arrays.asList(topicName)); + metadata.update(cluster, time.milliseconds()); + + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + Map> memberSubscriptions = Collections.singletonMap(consumerId, Arrays.asList(topicName)); + partitionAssignor.prepare(Collections.singletonMap(consumerId, singletonList(tp))); + + // the leader is responsible for picking up metadata changes and forcing a group rebalance + client.prepareResponse(joinGroupLeaderResponse(1, consumerId, memberSubscriptions, Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); - assertFalse(subscriptions.partitionAssignmentNeeded()); + assertFalse(coordinator.needRejoin()); // a new partition is added to the topic metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds()); // we should detect the change and ask for reassignment - assertTrue(subscriptions.partitionAssignmentNeeded()); + assertTrue(coordinator.needRejoin()); } - @Test public void testUpdateMetadataDuringRebalance() { final String topic1 = "topic1"; @@ -590,9 +654,8 @@ public void testUpdateMetadataDuringRebalance() { List topics = Arrays.asList(topic1, topic2); - subscriptions.subscribe(topics, rebalanceListener); + subscriptions.subscribe(new HashSet<>(topics), rebalanceListener); metadata.setTopics(topics); - subscriptions.needReassignment(); // we only have metadata for one topic initially metadata.update(TestUtils.singletonCluster(topic1, 1), time.milliseconds()); @@ -629,7 +692,7 @@ public boolean matches(ClientRequest request) { coordinator.poll(time.milliseconds()); - assertFalse(subscriptions.partitionAssignmentNeeded()); + assertFalse(coordinator.needRejoin()); assertEquals(new HashSet<>(Arrays.asList(tp1, tp2)), subscriptions.assignedPartitions()); } @@ -640,7 +703,7 @@ public void testExcludeInternalTopicsConfigOption() { metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds()); - assertFalse(subscriptions.partitionAssignmentNeeded()); + assertFalse(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME)); } @Test @@ -650,41 +713,43 @@ public void testIncludeInternalTopicsConfigOption() { metadata.update(TestUtils.singletonCluster(TestUtils.GROUP_METADATA_TOPIC_NAME, 2), time.milliseconds()); - assertTrue(subscriptions.partitionAssignmentNeeded()); + assertTrue(subscriptions.subscription().contains(TestUtils.GROUP_METADATA_TOPIC_NAME)); } @Test public void testRejoinGroup() { - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + String otherTopic = "otherTopic"; + + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); // join the group once client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); assertEquals(1, rebalanceListener.revokedCount); + assertTrue(rebalanceListener.revoked.isEmpty()); assertEquals(1, rebalanceListener.assignedCount); + assertEquals(singleton(tp), rebalanceListener.assigned); // and join the group again - subscriptions.needReassignment(); + subscriptions.subscribe(new HashSet<>(Arrays.asList(topicName, otherTopic)), rebalanceListener); client.prepareResponse(joinGroupFollowerResponse(2, "consumer", "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); assertEquals(2, rebalanceListener.revokedCount); - assertEquals(Collections.singleton(tp), rebalanceListener.revoked); + assertEquals(singleton(tp), rebalanceListener.revoked); assertEquals(2, rebalanceListener.assignedCount); - assertEquals(Collections.singleton(tp), rebalanceListener.assigned); + assertEquals(singleton(tp), rebalanceListener.assigned); } @Test public void testDisconnectInJoin() { - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -693,19 +758,18 @@ public void testDisconnectInJoin() { client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code()), true); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); - assertFalse(subscriptions.partitionAssignmentNeeded()); - assertEquals(Collections.singleton(tp), subscriptions.assignedPartitions()); + assertFalse(coordinator.needRejoin()); + assertEquals(singleton(tp), subscriptions.assignedPartitions()); assertEquals(1, rebalanceListener.revokedCount); assertEquals(1, rebalanceListener.assignedCount); - assertEquals(Collections.singleton(tp), rebalanceListener.assigned); + assertEquals(singleton(tp), rebalanceListener.assigned); } @Test(expected = ApiException.class) public void testInvalidSessionTimeout() { - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -717,7 +781,7 @@ public void testInvalidSessionTimeout() { @Test public void testCommitOffsetOnly() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -739,14 +803,13 @@ public void testAutoCommitDynamicAssignment() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); subscriptions.seek(tp, 100); @@ -765,8 +828,7 @@ public void testAutoCommitDynamicAssignmentRebalance() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - subscriptions.needReassignment(); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -776,7 +838,7 @@ public void testAutoCommitDynamicAssignmentRebalance() { consumerClient.poll(0); client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); subscriptions.seek(tp, 100); @@ -793,7 +855,7 @@ public void testAutoCommitManualAssignment() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 100); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); @@ -811,7 +873,7 @@ public void testAutoCommitManualAssignmentCoordinatorUnknown() { ConsumerCoordinator coordinator = buildCoordinator(new Metrics(), assignors, ConsumerConfig.DEFAULT_EXCLUDE_INTERNAL_TOPICS, true); - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 100); // no commit initially since coordinator is unknown @@ -835,7 +897,7 @@ public void testAutoCommitManualAssignmentCoordinatorUnknown() { @Test public void testCommitOffsetMetadata() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); @@ -866,20 +928,20 @@ public void testCommitOffsetAsyncWithDefaultCallback() { @Test public void testCommitAfterLeaveGroup() { // enable auto-assignment - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); + subscriptions.subscribe(singleton(topicName), rebalanceListener); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); client.prepareResponse(joinGroupFollowerResponse(1, "consumer", "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); + client.prepareResponse(syncGroupResponse(singletonList(tp), Errors.NONE.code())); coordinator.poll(time.milliseconds()); // now switch to manual assignment client.prepareResponse(new LeaveGroupResponse(Errors.NONE.code()).toStruct()); subscriptions.unsubscribe(); coordinator.maybeLeaveGroup(); - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); // the client should not reuse generation/memberId from auto-subscribed generation client.prepareResponse(new MockClient.RequestMatcher() { @@ -1048,7 +1110,7 @@ public void testRefreshOffset() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.needRefreshCommits(); client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); coordinator.refreshCommittedOffsetsIfNeeded(); @@ -1061,7 +1123,7 @@ public void testRefreshOffsetLoadInProgress() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.needRefreshCommits(); client.prepareResponse(offsetFetchResponse(tp, Errors.GROUP_LOAD_IN_PROGRESS.code(), "", 100L)); client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", 100L)); @@ -1075,7 +1137,7 @@ public void testRefreshOffsetNotCoordinatorForConsumer() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.needRefreshCommits(); client.prepareResponse(offsetFetchResponse(tp, Errors.NOT_COORDINATOR_FOR_GROUP.code(), "", 100L)); client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); @@ -1090,7 +1152,7 @@ public void testRefreshOffsetWithNoFetchableOffsets() { client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); coordinator.ensureCoordinatorReady(); - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.needRefreshCommits(); client.prepareResponse(offsetFetchResponse(tp, Errors.NONE.code(), "", -1L)); coordinator.refreshCommittedOffsetsIfNeeded(); @@ -1122,37 +1184,6 @@ public void testProtocolMetadataOrder() { } } - @Test - public void testMetadataTopicsExpiryDisabled() { - final String consumerId = "consumer"; - - subscriptions.subscribe(Arrays.asList(topicName), rebalanceListener); - HashSet topics = new HashSet<>(); - topics.add(topicName); - metadata.setTopics(topics); - subscriptions.needReassignment(); - - client.prepareResponse(groupCoordinatorResponse(node, Errors.NONE.code())); - coordinator.ensureCoordinatorReady(); - - client.prepareResponse(joinGroupFollowerResponse(1, consumerId, "leader", Errors.NONE.code())); - client.prepareResponse(syncGroupResponse(Arrays.asList(tp), Errors.NONE.code())); - coordinator.poll(time.milliseconds()); - - metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds()); - assertTrue("Topic not found in metadata", metadata.containsTopic(topicName)); - time.sleep(Metadata.TOPIC_EXPIRY_MS * 2); - metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds()); - assertTrue("Topic expired", metadata.containsTopic(topicName)); - metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds()); - metadata.update(Cluster.empty(), time.milliseconds()); - assertTrue("Topic expired", metadata.containsTopic(topicName)); - - assertTrue(subscriptions.partitionAssignmentNeeded()); - metadata.update(TestUtils.singletonCluster(topicName, 2), time.milliseconds()); - assertTrue(subscriptions.partitionAssignmentNeeded()); - } - private ConsumerCoordinator buildCoordinator(Metrics metrics, List assignors, boolean excludeInternalTopics, @@ -1187,7 +1218,8 @@ private Struct heartbeatResponse(short error) { return response.toStruct(); } - private Struct joinGroupLeaderResponse(int generationId, String memberId, + private Struct joinGroupLeaderResponse(int generationId, + String memberId, Map> subscriptions, short error) { Map metadata = new HashMap<>(); diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java index 518661877fb73..5c0b49c94bb3d 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java @@ -66,6 +66,7 @@ import java.util.Map; import java.util.Random; +import static java.util.Collections.singleton; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -123,7 +124,7 @@ public void teardown() { @Test public void testFetchNormal() { List> records; - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); // normal fetch @@ -167,7 +168,7 @@ public byte[] deserialize(String topic, byte[] data) { Fetcher fetcher = createFetcher(subscriptions, new Metrics(time), deserializer, deserializer); - subscriptions.assignFromUser(Collections.singleton(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 1); client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); @@ -209,7 +210,7 @@ public void testParseInvalidRecord() { compressor.close(); buffer.flip(); - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); // normal fetch @@ -230,7 +231,7 @@ public void testFetchMaxPollRecords() { Fetcher fetcher = createFetcher(subscriptions, new Metrics(time), 2); List> records; - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 1); client.prepareResponse(matchesOffset(tp, 1), fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); @@ -272,7 +273,7 @@ public void testFetchNonContinuousRecords() { records.close(); List> consumerRecords; - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); // normal fetch @@ -290,7 +291,7 @@ public void testFetchNonContinuousRecords() { @Test(expected = RecordTooLargeException.class) public void testFetchRecordTooLarge() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); // prepare large record @@ -309,7 +310,7 @@ public void testFetchRecordTooLarge() { @Test public void testUnauthorizedTopic() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); // resize the limit of the buffer to pretend it is only fetch-size large @@ -320,20 +321,20 @@ public void testUnauthorizedTopic() { fetcher.fetchedRecords(); fail("fetchedRecords should have thrown"); } catch (TopicAuthorizationException e) { - assertEquals(Collections.singleton(topicName), e.unauthorizedTopics()); + assertEquals(singleton(topicName), e.unauthorizedTopics()); } } @Test public void testFetchDuringRebalance() { - subscriptions.subscribe(Arrays.asList(topicName), listener); - subscriptions.assignFromSubscribed(Arrays.asList(tp)); + subscriptions.subscribe(singleton(topicName), listener); + subscriptions.assignFromSubscribed(singleton(tp)); subscriptions.seek(tp, 0); fetcher.sendFetches(); // Now the rebalance happens and fetch positions are cleared - subscriptions.assignFromSubscribed(Arrays.asList(tp)); + subscriptions.assignFromSubscribed(singleton(tp)); client.prepareResponse(fetchResponse(this.records.buffer(), Errors.NONE.code(), 100L, 0)); consumerClient.poll(0); @@ -343,7 +344,7 @@ public void testFetchDuringRebalance() { @Test public void testInFlightFetchOnPausedPartition() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); fetcher.sendFetches(); @@ -356,7 +357,7 @@ public void testInFlightFetchOnPausedPartition() { @Test public void testFetchOnPausedPartition() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); subscriptions.pause(tp); @@ -366,7 +367,7 @@ public void testFetchOnPausedPartition() { @Test public void testFetchNotLeaderForPartition() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); fetcher.sendFetches(); @@ -378,7 +379,7 @@ public void testFetchNotLeaderForPartition() { @Test public void testFetchUnknownTopicOrPartition() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); fetcher.sendFetches(); @@ -390,7 +391,7 @@ public void testFetchUnknownTopicOrPartition() { @Test public void testFetchOffsetOutOfRange() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); fetcher.sendFetches(); @@ -405,7 +406,7 @@ public void testFetchOffsetOutOfRange() { public void testStaleOutOfRangeError() { // verify that an out of range error which arrives after a seek // does not cause us to reset our position or throw an exception - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); fetcher.sendFetches(); @@ -419,7 +420,7 @@ public void testStaleOutOfRangeError() { @Test public void testFetchedRecordsAfterSeek() { - subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp)); + subscriptionsNoAutoReset.assignFromUser(singleton(tp)); subscriptionsNoAutoReset.seek(tp, 0); fetcherNoAutoReset.sendFetches(); @@ -432,7 +433,7 @@ public void testFetchedRecordsAfterSeek() { @Test public void testFetchOffsetOutOfRangeException() { - subscriptionsNoAutoReset.assignFromUser(Arrays.asList(tp)); + subscriptionsNoAutoReset.assignFromUser(singleton(tp)); subscriptionsNoAutoReset.seek(tp, 0); fetcherNoAutoReset.sendFetches(); @@ -452,7 +453,7 @@ public void testFetchOffsetOutOfRangeException() { @Test public void testFetchDisconnected() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); fetcher.sendFetches(); @@ -470,22 +471,22 @@ public void testFetchDisconnected() { public void testUpdateFetchPositionToCommitted() { // unless a specific reset is expected, the default behavior is to reset to the committed // position if one is present - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.committed(tp, new OffsetAndMetadata(5)); - fetcher.updateFetchPositions(Collections.singleton(tp)); + fetcher.updateFetchPositions(singleton(tp)); assertTrue(subscriptions.isFetchable(tp)); assertEquals(5, subscriptions.position(tp).longValue()); } @Test public void testUpdateFetchPositionResetToDefaultOffset() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); // with no commit position, we should reset using the default strategy defined above (EARLIEST) client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L))); - fetcher.updateFetchPositions(Collections.singleton(tp)); + fetcher.updateFetchPositions(singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); assertEquals(5, subscriptions.position(tp).longValue()); @@ -493,12 +494,12 @@ public void testUpdateFetchPositionResetToDefaultOffset() { @Test public void testUpdateFetchPositionResetToLatestOffset() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L))); - fetcher.updateFetchPositions(Collections.singleton(tp)); + fetcher.updateFetchPositions(singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); assertEquals(5, subscriptions.position(tp).longValue()); @@ -506,12 +507,12 @@ public void testUpdateFetchPositionResetToLatestOffset() { @Test public void testUpdateFetchPositionResetToEarliestOffset() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.needOffsetReset(tp, OffsetResetStrategy.EARLIEST); client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.EARLIEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L))); - fetcher.updateFetchPositions(Collections.singleton(tp)); + fetcher.updateFetchPositions(singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); assertEquals(5, subscriptions.position(tp).longValue()); @@ -519,7 +520,7 @@ public void testUpdateFetchPositionResetToEarliestOffset() { @Test public void testUpdateFetchPositionDisconnect() { - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.needOffsetReset(tp, OffsetResetStrategy.LATEST); // First request gets a disconnect @@ -529,7 +530,7 @@ public void testUpdateFetchPositionDisconnect() { // Next one succeeds client.prepareResponse(listOffsetRequestMatcher(ListOffsetRequest.LATEST_TIMESTAMP), listOffsetResponse(Errors.NONE, Arrays.asList(5L))); - fetcher.updateFetchPositions(Collections.singleton(tp)); + fetcher.updateFetchPositions(singleton(tp)); assertFalse(subscriptions.isOffsetResetNeeded(tp)); assertTrue(subscriptions.isFetchable(tp)); assertEquals(5, subscriptions.position(tp).longValue()); @@ -567,7 +568,7 @@ public void testGetAllTopicsUnauthorized() { fetcher.getAllTopicMetadata(10L); fail(); } catch (TopicAuthorizationException e) { - assertEquals(Collections.singleton(topicName), e.unauthorizedTopics()); + assertEquals(singleton(topicName), e.unauthorizedTopics()); } } @@ -600,7 +601,7 @@ public void testGetTopicMetadataLeaderNotAvailable() { @Test public void testQuotaMetrics() throws Exception { List> records; - subscriptions.assignFromUser(Arrays.asList(tp)); + subscriptions.assignFromUser(singleton(tp)); subscriptions.seek(tp, 0); // normal fetch diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java index 3b4b10e7b0c14..783f0e63c8b78 100644 --- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/SubscriptionStateTest.java @@ -16,21 +16,23 @@ */ package org.apache.kafka.clients.consumer.internals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertTrue; -import static java.util.Arrays.asList; +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; +import org.apache.kafka.clients.consumer.OffsetAndMetadata; +import org.apache.kafka.clients.consumer.OffsetResetStrategy; +import org.apache.kafka.common.TopicPartition; +import org.junit.Test; import java.util.Arrays; import java.util.Collection; import java.util.Collections; +import java.util.HashSet; import java.util.regex.Pattern; -import org.apache.kafka.clients.consumer.ConsumerRebalanceListener; -import org.apache.kafka.clients.consumer.OffsetAndMetadata; -import org.apache.kafka.clients.consumer.OffsetResetStrategy; -import org.apache.kafka.common.TopicPartition; -import org.junit.Test; +import static java.util.Arrays.asList; +import static java.util.Collections.singleton; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; public class SubscriptionStateTest { @@ -43,16 +45,15 @@ public class SubscriptionStateTest { @Test public void partitionAssignment() { - state.assignFromUser(Arrays.asList(tp0)); - assertEquals(Collections.singleton(tp0), state.assignedPartitions()); - assertFalse(state.partitionAssignmentNeeded()); + state.assignFromUser(singleton(tp0)); + assertEquals(singleton(tp0), state.assignedPartitions()); assertFalse(state.hasAllFetchPositions()); assertTrue(state.refreshCommitsNeeded()); state.committed(tp0, new OffsetAndMetadata(1)); state.seek(tp0, 1); assertTrue(state.isFetchable(tp0)); assertAllPositions(tp0, 1L); - state.assignFromUser(Arrays.asList()); + state.assignFromUser(Collections.emptySet()); assertTrue(state.assignedPartitions().isEmpty()); assertFalse(state.isAssigned(tp0)); assertFalse(state.isFetchable(tp0)); @@ -60,7 +61,7 @@ public void partitionAssignment() { @Test public void partitionReset() { - state.assignFromUser(Arrays.asList(tp0)); + state.assignFromUser(singleton(tp0)); state.seek(tp0, 5); assertEquals(5L, (long) state.position(tp0)); state.needOffsetReset(tp0); @@ -76,9 +77,8 @@ public void partitionReset() { @Test public void topicSubscription() { - state.subscribe(Arrays.asList(topic), rebalanceListener); + state.subscribe(singleton(topic), rebalanceListener); assertEquals(1, state.subscription().size()); - assertTrue(state.partitionAssignmentNeeded()); assertTrue(state.assignedPartitions().isEmpty()); assertTrue(state.partitionsAutoAssigned()); state.assignFromSubscribed(asList(tp0)); @@ -87,15 +87,14 @@ public void topicSubscription() { assertAllPositions(tp0, 1L); state.assignFromSubscribed(asList(tp1)); assertTrue(state.isAssigned(tp1)); - assertFalse(state.partitionAssignmentNeeded()); assertFalse(state.isAssigned(tp0)); assertFalse(state.isFetchable(tp1)); - assertEquals(Collections.singleton(tp1), state.assignedPartitions()); + assertEquals(singleton(tp1), state.assignedPartitions()); } @Test public void partitionPause() { - state.assignFromUser(Arrays.asList(tp0)); + state.assignFromUser(singleton(tp0)); state.seek(tp0, 100); assertTrue(state.isFetchable(tp0)); state.pause(tp0); @@ -106,7 +105,7 @@ public void partitionPause() { @Test public void commitOffsetMetadata() { - state.assignFromUser(Arrays.asList(tp0)); + state.assignFromUser(singleton(tp0)); state.committed(tp0, new OffsetAndMetadata(5, "hi")); assertEquals(5, state.committed(tp0).offset()); @@ -115,7 +114,7 @@ public void commitOffsetMetadata() { @Test(expected = IllegalStateException.class) public void invalidPositionUpdate() { - state.subscribe(Arrays.asList(topic), rebalanceListener); + state.subscribe(singleton(topic), rebalanceListener); state.assignFromSubscribed(asList(tp0)); state.position(tp0, 0); } @@ -132,32 +131,32 @@ public void assertAllPositions(TopicPartition tp, Long offset) { @Test(expected = IllegalStateException.class) public void cantSubscribeTopicAndPattern() { - state.subscribe(Arrays.asList(topic), rebalanceListener); + state.subscribe(singleton(topic), rebalanceListener); state.subscribe(Pattern.compile(".*"), rebalanceListener); } @Test(expected = IllegalStateException.class) public void cantSubscribePartitionAndPattern() { - state.assignFromUser(Arrays.asList(tp0)); + state.assignFromUser(singleton(tp0)); state.subscribe(Pattern.compile(".*"), rebalanceListener); } @Test(expected = IllegalStateException.class) public void cantSubscribePatternAndTopic() { state.subscribe(Pattern.compile(".*"), rebalanceListener); - state.subscribe(Arrays.asList(topic), rebalanceListener); + state.subscribe(singleton(topic), rebalanceListener); } @Test(expected = IllegalStateException.class) public void cantSubscribePatternAndPartition() { state.subscribe(Pattern.compile(".*"), rebalanceListener); - state.assignFromUser(Arrays.asList(tp0)); + state.assignFromUser(singleton(tp0)); } @Test public void patternSubscription() { state.subscribe(Pattern.compile(".*"), rebalanceListener); - state.changeSubscription(Arrays.asList(topic, topic1)); + state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1))); assertEquals( "Expected subscribed topics count is incorrect", 2, state.subscription().size()); @@ -165,43 +164,37 @@ public void patternSubscription() { @Test public void unsubscribeUserAssignment() { - state.assignFromUser(Arrays.asList(tp0, tp1)); + state.assignFromUser(new HashSet<>(Arrays.asList(tp0, tp1))); state.unsubscribe(); - state.subscribe(Arrays.asList(topic), rebalanceListener); - assertEquals(Collections.singleton(topic), state.subscription()); + state.subscribe(singleton(topic), rebalanceListener); + assertEquals(singleton(topic), state.subscription()); } @Test public void unsubscribeUserSubscribe() { - state.subscribe(Arrays.asList(topic), rebalanceListener); + state.subscribe(singleton(topic), rebalanceListener); state.unsubscribe(); - state.assignFromUser(Arrays.asList(tp0)); - assertEquals(Collections.singleton(tp0), state.assignedPartitions()); + state.assignFromUser(singleton(tp0)); + assertEquals(singleton(tp0), state.assignedPartitions()); } @Test public void unsubscription() { state.subscribe(Pattern.compile(".*"), rebalanceListener); - state.changeSubscription(Arrays.asList(topic, topic1)); - assertTrue(state.partitionAssignmentNeeded()); - + state.subscribeFromPattern(new HashSet<>(Arrays.asList(topic, topic1))); state.assignFromSubscribed(asList(tp1)); - assertEquals(Collections.singleton(tp1), state.assignedPartitions()); - assertFalse(state.partitionAssignmentNeeded()); + assertEquals(singleton(tp1), state.assignedPartitions()); state.unsubscribe(); assertEquals(0, state.subscription().size()); assertTrue(state.assignedPartitions().isEmpty()); - assertTrue(state.partitionAssignmentNeeded()); - state.assignFromUser(Arrays.asList(tp0)); - assertEquals(Collections.singleton(tp0), state.assignedPartitions()); - assertFalse(state.partitionAssignmentNeeded()); + state.assignFromUser(singleton(tp0)); + assertEquals(singleton(tp0), state.assignedPartitions()); state.unsubscribe(); assertEquals(0, state.subscription().size()); assertTrue(state.assignedPartitions().isEmpty()); - assertTrue(state.partitionAssignmentNeeded()); } private static class MockRebalanceListener implements ConsumerRebalanceListener {