Skip to content

Commit d2adfbe

Browse files
committed
KAFKA-12464: enhance constrained sticky Assign algorithm
1 parent e9c5a39 commit d2adfbe

File tree

1 file changed

+57
-68
lines changed

1 file changed

+57
-68
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java

Lines changed: 57 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,6 @@
2828
import java.util.Map;
2929
import java.util.Map.Entry;
3030
import java.util.Optional;
31-
import java.util.Queue;
3231
import java.util.Set;
3332
import java.util.SortedSet;
3433
import java.util.TreeMap;
@@ -108,6 +107,8 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
108107
subscribedTopics.addAll(subscription.topics());
109108
} else if (!(subscription.topics().size() == subscribedTopics.size()
110109
&& subscribedTopics.containsAll(subscription.topics()))) {
110+
// we don't need consumerToOwnedPartitions in general assign case
111+
consumerToOwnedPartitions = null;
111112
return false;
112113
}
113114

@@ -149,12 +150,8 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
149150
* This constrainedAssign optimizes the assignment algorithm when all consumers were subscribed to same set of topics.
150151
* The method includes the following steps:
151152
*
152-
* 1. Reassign as many previously owned partitions as possible, up to the maxQuota
153-
* 2. Fill remaining members up to minQuota
154-
* 3. If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions
155-
* from the over-full consumers at max capacity
156-
* 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we
157-
* should just distribute one partition each to all consumers at min capacity
153+
* 1. Reassign as many previously owned partitions as possible, up to the expected number of maxQuota, otherwise, minQuota
154+
* 2. Fill remaining members up to the expected numbers of maxQuota, otherwise, to minQuota
158155
*
159156
* @param partitionsPerTopic The number of partitions for each subscribed topic
160157
* @param consumerToOwnedPartitions Each consumer's previously owned and still-subscribed partitions
@@ -163,66 +160,80 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
163160
*/
164161
private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic,
165162
Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
163+
if (log.isDebugEnabled()) {
164+
log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
165+
partitionsPerTopic, consumerToOwnedPartitions));
166+
}
167+
166168
SortedSet<TopicPartition> unassignedPartitions = getTopicPartitions(partitionsPerTopic);
167169

168170
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
169171

170-
// Each consumer should end up in exactly one of the below
171-
// the consumers not yet at capacity
172+
// the consumers not yet at expected capacity
172173
List<String> unfilledMembers = new LinkedList<>();
173-
// the members with exactly maxQuota partitions assigned
174-
Queue<String> maxCapacityMembers = new LinkedList<>();
175-
// the members with exactly minQuota partitions assigned
176-
Queue<String> minCapacityMembers = new LinkedList<>();
177174

178175
int numberOfConsumers = consumerToOwnedPartitions.size();
179-
int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) / numberOfConsumers);
180-
int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) / numberOfConsumers);
176+
int totalPartitionsCount = unassignedPartitions.size();
177+
178+
int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers);
179+
int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers);
180+
// the expected number of members with maxQuota assignment
181+
int numExpectedMaxCapacityMembers = totalPartitionsCount % numberOfConsumers;
182+
// the number of members with exactly maxQuota partitions assigned
183+
int numMaxCapacityMembers = 0;
181184

182-
// initialize the assignment map with an empty array of size minQuota for all members
185+
// initialize the assignment map with an empty array of size maxQuota for all members
183186
Map<String, List<TopicPartition>> assignment = new HashMap<>(
184-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota))));
187+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota))));
185188

186189
// Reassign as many previously owned partitions as possible
187190
for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
188191
String consumer = consumerEntry.getKey();
189192
List<TopicPartition> ownedPartitions = consumerEntry.getValue();
190193

191194
List<TopicPartition> consumerAssignment = assignment.get(consumer);
192-
int i = 0;
193-
// assign the first N partitions up to the max quota, and mark the remaining as being revoked
194-
for (TopicPartition tp : ownedPartitions) {
195-
if (i < maxQuota) {
196-
consumerAssignment.add(tp);
197-
unassignedPartitions.remove(tp);
198-
} else {
199-
allRevokedPartitions.add(tp);
200-
}
201-
++i;
202-
}
203195

204196
if (ownedPartitions.size() < minQuota) {
197+
// the expected assignment size is more than consumer have now, so keep all the owned partitions
198+
// and put this member into unfilled member list
199+
if (ownedPartitions.size() > 0) {
200+
consumerAssignment.addAll(ownedPartitions);
201+
unassignedPartitions.removeAll(ownedPartitions);
202+
}
205203
unfilledMembers.add(consumer);
204+
} else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
205+
// consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members
206+
// so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
207+
consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota));
208+
unassignedPartitions.removeAll(ownedPartitions.subList(0, maxQuota));
209+
allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size()));
206210
} else {
207-
// It's possible for a consumer to be at both min and max capacity if minQuota == maxQuota
208-
if (consumerAssignment.size() == minQuota)
209-
minCapacityMembers.add(consumer);
210-
if (consumerAssignment.size() == maxQuota)
211-
maxCapacityMembers.add(consumer);
211+
// consumer owned the "minQuota" of partitions or more
212+
// so keep "minQuota" of the owned partitions, and revoke the rest of the partitions
213+
consumerAssignment.addAll(ownedPartitions.subList(0, minQuota));
214+
unassignedPartitions.removeAll(ownedPartitions.subList(0, minQuota));
215+
allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size()));
212216
}
213217
}
214218

219+
if (log.isDebugEnabled()) {
220+
log.debug(String.format(
221+
"After reassigning previously owned partitions, unfilled members: %s, unassigned partitions: %s, " +
222+
"current assignment: %s", unfilledMembers, unassignedPartitions, assignment));
223+
}
224+
215225
Collections.sort(unfilledMembers);
216226
Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator();
217227

218-
// Fill remaining members up to minQuota
228+
// fill remaining members up to the expected numbers of maxQuota, otherwise, to minQuota
219229
while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
220230
Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
221231

222232
while (unfilledConsumerIter.hasNext()) {
223233
String consumer = unfilledConsumerIter.next();
224234
List<TopicPartition> consumerAssignment = assignment.get(consumer);
225235

236+
int expectedAssignedCount = numMaxCapacityMembers < numExpectedMaxCapacityMembers ? maxQuota : minQuota;
226237
if (unassignedPartitionsIter.hasNext()) {
227238
TopicPartition tp = unassignedPartitionsIter.next();
228239
consumerAssignment.add(tp);
@@ -231,48 +242,26 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
231242
if (allRevokedPartitions.contains(tp))
232243
partitionsTransferringOwnership.put(tp, consumer);
233244
} else {
245+
// Should not enter here since we have calculated the exact number to assign to each consumer
246+
log.warn(String.format(
247+
"No more partitions to be assigned. consumer: [%s] with current size: %d, but expected size is %d",
248+
consumer, consumerAssignment.size(), expectedAssignedCount));
234249
break;
235250
}
236251

237-
if (consumerAssignment.size() == minQuota) {
238-
minCapacityMembers.add(consumer);
239-
unfilledConsumerIter.remove();
240-
}
241-
}
242-
}
252+
int currentAssignedCount = consumerAssignment.size();
243253

244-
// If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions
245-
// from the over-full consumers at max capacity
246-
for (String consumer : unfilledMembers) {
247-
List<TopicPartition> consumerAssignment = assignment.get(consumer);
248-
int remainingCapacity = minQuota - consumerAssignment.size();
249-
while (remainingCapacity > 0) {
250-
String overloadedConsumer = maxCapacityMembers.poll();
251-
if (overloadedConsumer == null) {
252-
throw new IllegalStateException("Some consumers are under capacity but all partitions have been assigned");
254+
if (currentAssignedCount == expectedAssignedCount) {
255+
if (currentAssignedCount == maxQuota) {
256+
numMaxCapacityMembers++;
257+
}
258+
unfilledConsumerIter.remove();
253259
}
254-
TopicPartition swappedPartition = assignment.get(overloadedConsumer).remove(0);
255-
consumerAssignment.add(swappedPartition);
256-
--remainingCapacity;
257-
// This partition is by definition transferring ownership, the swapped partition must have come from
258-
// the max capacity member's owned partitions since it can only reach max capacity with owned partitions
259-
partitionsTransferringOwnership.put(swappedPartition, consumer);
260260
}
261-
minCapacityMembers.add(consumer);
262261
}
263262

264-
// Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we
265-
// should just distribute one partition each to all consumers at min capacity
266-
for (TopicPartition unassignedPartition : unassignedPartitions) {
267-
String underCapacityConsumer = minCapacityMembers.poll();
268-
if (underCapacityConsumer == null) {
269-
throw new IllegalStateException("Some partitions are unassigned but all consumers are at maximum capacity");
270-
}
271-
// We can skip the bookkeeping of unassignedPartitions and maxCapacityMembers here since we are at the end
272-
assignment.get(underCapacityConsumer).add(unassignedPartition);
273-
274-
if (allRevokedPartitions.contains(unassignedPartition))
275-
partitionsTransferringOwnership.put(unassignedPartition, underCapacityConsumer);
263+
if (log.isDebugEnabled()) {
264+
log.debug("final assignment: " + assignment);
276265
}
277266

278267
return assignment;

0 commit comments

Comments
 (0)