Skip to content

Commit 8fe9267

Browse files
committed
KAFKA-12464: remove sortedAllPartitions to save memory and refactor codes
1 parent cd68d10 commit 8fe9267

File tree

1 file changed

+97
-96
lines changed

1 file changed

+97
-96
lines changed

clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java

Lines changed: 97 additions & 96 deletions
Original file line numberDiff line numberDiff line change
@@ -80,6 +80,8 @@ public Map<String, List<TopicPartition>> assign(Map<String, Integer> partitionsP
8080
log.debug("Detected that all not consumers were subscribed to same set of topics, falling back to the "
8181
+ "general case assignment algorithm");
8282
partitionsTransferringOwnership = null;
83+
// we don't need consumerToOwnedPartitions in general assign case
84+
consumerToOwnedPartitions = null;
8385
return generalAssign(partitionsPerTopic, subscriptions);
8486
}
8587
}
@@ -106,8 +108,6 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
106108
subscribedTopics.addAll(subscription.topics());
107109
} else if (!(subscription.topics().size() == subscribedTopics.size()
108110
&& subscribedTopics.containsAll(subscription.topics()))) {
109-
// we don't need consumerToOwnedPartitions in general assign case
110-
consumerToOwnedPartitions = null;
111111
return false;
112112
}
113113

@@ -160,19 +160,17 @@ private boolean allSubscriptionsEqual(Set<String> allTopics,
160160
private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer> partitionsPerTopic,
161161
Map<String, List<TopicPartition>> consumerToOwnedPartitions) {
162162
if (log.isDebugEnabled()) {
163-
log.debug(String.format("performing constrained assign. partitionsPerTopic: %s, consumerToOwnedPartitions: %s",
164-
partitionsPerTopic, consumerToOwnedPartitions));
163+
log.debug("performing constrained assign. partitionsPerTopic: {}, consumerToOwnedPartitions: {}",
164+
partitionsPerTopic, consumerToOwnedPartitions);
165165
}
166166

167-
List<TopicPartition> sortedAllPartitions = getTopicPartitions(partitionsPerTopic);
168-
169167
Set<TopicPartition> allRevokedPartitions = new HashSet<>();
170168

171169
// the consumers not yet at expected capacity
172170
List<String> unfilledMembers = new LinkedList<>();
173171

174172
int numberOfConsumers = consumerToOwnedPartitions.size();
175-
int totalPartitionsCount = sortedAllPartitions.size();
173+
int totalPartitionsCount = partitionsPerTopic.values().stream().reduce(0, Integer::sum);
176174

177175
int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers);
178176
int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers);
@@ -185,7 +183,7 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
185183
Map<String, List<TopicPartition>> assignment = new HashMap<>(
186184
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota))));
187185

188-
List<TopicPartition> toBeRemovedPartitions = new ArrayList<>();
186+
List<TopicPartition> assignedPartitions = new ArrayList<>();
189187
// Reassign as many previously owned partitions as possible
190188
for (Map.Entry<String, List<TopicPartition>> consumerEntry : consumerToOwnedPartitions.entrySet()) {
191189
String consumer = consumerEntry.getKey();
@@ -198,138 +196,141 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
198196
// and put this member into unfilled member list
199197
if (ownedPartitions.size() > 0) {
200198
consumerAssignment.addAll(ownedPartitions);
201-
toBeRemovedPartitions.addAll(ownedPartitions);
199+
assignedPartitions.addAll(ownedPartitions);
202200
}
203201
unfilledMembers.add(consumer);
204-
} else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers++ <= numExpectedMaxCapacityMembers) {
205-
// consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members
202+
} else if (ownedPartitions.size() >= maxQuota && numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
203+
// consumer owned the "maxQuota" of partitions or more, and we're still under the number of expected max capacity members
206204
// so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
207-
consumerAssignment.addAll(ownedPartitions.subList(0, maxQuota));
208-
toBeRemovedPartitions.addAll(ownedPartitions.subList(0, maxQuota));
205+
numMaxCapacityMembers++;
206+
List<TopicPartition> maxQuotaPartitions = ownedPartitions.subList(0, maxQuota);
207+
consumerAssignment.addAll(maxQuotaPartitions);
208+
assignedPartitions.addAll(maxQuotaPartitions);
209209
allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, ownedPartitions.size()));
210210
} else {
211-
// consumer owned the "minQuota" of partitions or more
211+
// consumer owned at least "minQuota" of partitions but we're already at the allowed number of max capacity members
212212
// so keep "minQuota" of the owned partitions, and revoke the rest of the partitions
213-
consumerAssignment.addAll(ownedPartitions.subList(0, minQuota));
214-
toBeRemovedPartitions.addAll(ownedPartitions.subList(0, minQuota));
213+
List<TopicPartition> minQuotaPartitions = ownedPartitions.subList(0, minQuota);
214+
consumerAssignment.addAll(minQuotaPartitions);
215+
assignedPartitions.addAll(minQuotaPartitions);
215216
allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size()));
216217
}
217218
}
218219

219-
List<TopicPartition> unassignedPartitions;
220-
if (!toBeRemovedPartitions.isEmpty()) {
221-
Collections.sort(toBeRemovedPartitions,
222-
Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
223-
unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, toBeRemovedPartitions);
224-
sortedAllPartitions = null;
225-
} else {
226-
unassignedPartitions = sortedAllPartitions;
227-
}
228-
toBeRemovedPartitions = null;
220+
List<TopicPartition> unassignedPartitions = getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, assignedPartitions);
221+
assignedPartitions = null;
229222

230223
if (log.isDebugEnabled()) {
231-
log.debug(String.format(
232-
"After reassigning previously owned partitions, unfilled members: %s, unassigned partitions: %s, " +
233-
"current assignment: %s", unfilledMembers, unassignedPartitions, assignment));
224+
log.debug("After reassigning previously owned partitions, unfilled members: {}, unassigned partitions: {}, " +
225+
"current assignment: {}", unfilledMembers, unassignedPartitions, assignment);
234226
}
235227

236228
Collections.sort(unfilledMembers);
237-
Iterator<TopicPartition> unassignedPartitionsIter = unassignedPartitions.iterator();
238-
239-
// fill remaining members up to the expected numbers of maxQuota, otherwise, to minQuota
240-
while (!unfilledMembers.isEmpty() && !unassignedPartitions.isEmpty()) {
241-
Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
242-
243-
while (unfilledConsumerIter.hasNext()) {
244-
String consumer = unfilledConsumerIter.next();
245-
List<TopicPartition> consumerAssignment = assignment.get(consumer);
246-
247-
int expectedAssignedCount = numMaxCapacityMembers < numExpectedMaxCapacityMembers ? maxQuota : minQuota;
248-
int currentAssignedCount = consumerAssignment.size();
249-
if (unassignedPartitionsIter.hasNext()) {
250-
TopicPartition tp = unassignedPartitionsIter.next();
251-
consumerAssignment.add(tp);
252-
currentAssignedCount++;
253-
// We already assigned all possible ownedPartitions, so we know this must be newly to this consumer
254-
if (allRevokedPartitions.contains(tp))
255-
partitionsTransferringOwnership.put(tp, consumer);
256-
} else {
257-
// This will only happen when current consumer has minQuota of partitions, and in previous round,
258-
// the expectedAssignedCount is maxQuota, so, still in unfilledMembers list.
259-
// But now, expectedAssignedCount is minQuota, we can remove it.
260-
if (currentAssignedCount != minQuota) {
261-
// Should not enter here since we have calculated the exact number to assign to each consumer
262-
log.warn(String.format(
263-
"No more partitions to be assigned. consumer: [%s] with current size: %d, but expected size is %d",
264-
consumer, currentAssignedCount, expectedAssignedCount));
265-
}
266-
unfilledConsumerIter.remove();
229+
230+
Iterator<String> unfilledConsumerIter = unfilledMembers.iterator();
231+
// Round-Robin filling remaining members up to the expected numbers of maxQuota, otherwise, to minQuota
232+
for (TopicPartition unassignedPartition : unassignedPartitions) {
233+
if (!unfilledConsumerIter.hasNext()) {
234+
if (unfilledMembers.isEmpty()) {
235+
// Should not enter here since we have calculated the exact number to assign to each consumer
236+
log.warn("No more unfilled consumers to be assigned.");
267237
break;
268238
}
239+
unfilledConsumerIter = unfilledMembers.iterator();
240+
}
241+
String consumer = unfilledConsumerIter.next();
242+
List<TopicPartition> consumerAssignment = assignment.get(consumer);
243+
consumerAssignment.add(unassignedPartition);
269244

270-
if (currentAssignedCount == expectedAssignedCount) {
271-
if (currentAssignedCount == maxQuota) {
272-
numMaxCapacityMembers++;
273-
}
274-
unfilledConsumerIter.remove();
245+
// We already assigned all possible ownedPartitions, so we know this must be newly to this consumer
246+
if (allRevokedPartitions.contains(unassignedPartition))
247+
partitionsTransferringOwnership.put(unassignedPartition, consumer);
248+
249+
int currentAssignedCount = consumerAssignment.size();
250+
int expectedAssignedCount = numMaxCapacityMembers < numExpectedMaxCapacityMembers ? maxQuota : minQuota;
251+
if (currentAssignedCount == expectedAssignedCount) {
252+
if (currentAssignedCount == maxQuota) {
253+
numMaxCapacityMembers++;
275254
}
255+
unfilledConsumerIter.remove();
276256
}
277257
}
278258

259+
if (!unfilledMembers.isEmpty()) {
260+
// Should not enter here since we have calculated the exact number to assign to each consumer
261+
log.warn("No more partitions to be assigned to unfilled consumers: {}", unfilledMembers);
262+
}
263+
279264
if (log.isDebugEnabled()) {
280-
log.debug("final assignment: " + assignment);
265+
log.debug("Final assignment of partitions to consumers: \n{}", assignment);
281266
}
282-
267+
283268
return assignment;
284269
}
285270

286271
/**
287-
* get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions)
288-
* and sortedToBeRemovedPartitions. We use two pointers technique here:
272+
* get the unassigned partition list by computing the difference set of all sorted partitions
273+
* and sortedAssignedPartitions. If no assigned partitions, we'll just return all topic partitions.
289274
*
290-
* We loop the sortedPartition, and compare the ith element in sorted toBeRemovedPartitions(i start from 0):
275+
* To compute the difference set, we use two pointers technique here:
276+
*
277+
* We loop through the all sorted topics, and then iterate all partitions the topic has,
278+
* compared with the ith element in sortedAssignedPartitions(i starts from 0):
291279
* - if not equal to the ith element, add to unassignedPartitions
292-
* - if equal to the the ith element, get next element from sortedToBeRemovedPartitions
280+
* - if equal to the the ith element, get next element from sortedAssignedPartitions
293281
*
294-
* @param sortedPartitions: sorted all partitions
295-
* @param sortedToBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions
282+
* @param totalPartitionsCount all partitions counts in this assignment
283+
* @param partitionsPerTopic The number of partitions for each subscribed topic.
284+
* @param sortedAssignedPartitions sorted partitions, all are included in the sortedPartitions
296285
* @return the partitions don't assign to any current consumers
297286
*/
298-
private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions,
299-
List<TopicPartition> sortedToBeRemovedPartitions) {
300-
List<TopicPartition> unassignedPartitions = new ArrayList<>(
301-
sortedPartitions.size() - sortedToBeRemovedPartitions.size());
287+
private List<TopicPartition> getUnassignedPartitions(int totalPartitionsCount,
288+
Map<String, Integer> partitionsPerTopic,
289+
List<TopicPartition> sortedAssignedPartitions) {
290+
List<String> sortedAllTopics = new ArrayList<>(partitionsPerTopic.keySet());
291+
// sort all topics first, then we can have sorted all topic partitions by adding partitions starting from 0
292+
Collections.sort(sortedAllTopics);
293+
294+
if (sortedAssignedPartitions.isEmpty()) {
295+
// no assigned partitions means all partitions are unassigned partitions
296+
return getAllTopicPartitions(partitionsPerTopic, sortedAllTopics, totalPartitionsCount);
297+
}
298+
299+
List<TopicPartition> unassignedPartitions = new ArrayList<>(totalPartitionsCount - sortedAssignedPartitions.size());
300+
301+
Collections.sort(sortedAssignedPartitions, Comparator.comparing(TopicPartition::topic).thenComparing(TopicPartition::partition));
302302

303-
int index = 0;
304303
boolean shouldAddDirectly = false;
305-
int sizeToBeRemovedPartitions = sortedToBeRemovedPartitions.size();
306-
TopicPartition nextPartition = sortedToBeRemovedPartitions.get(index);
307-
for (TopicPartition topicPartition : sortedPartitions) {
308-
if (shouldAddDirectly || !nextPartition.equals(topicPartition)) {
309-
unassignedPartitions.add(topicPartition);
310-
} else {
311-
// equal case, don't add to unassignedPartitions, just get next partition
312-
if (index < sizeToBeRemovedPartitions - 1) {
313-
nextPartition = sortedToBeRemovedPartitions.get(++index);
304+
Iterator<TopicPartition> sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator();
305+
TopicPartition nextPartition = sortedAssignedPartitionsIter.next();
306+
307+
for (String topic : sortedAllTopics) {
308+
int partitionCount = partitionsPerTopic.get(topic);
309+
for (int i = 0; i < partitionCount; i++) {
310+
if (shouldAddDirectly || !(nextPartition.topic().equals(topic) && nextPartition.partition() == i)) {
311+
unassignedPartitions.add(new TopicPartition(topic, i));
314312
} else {
315-
// add the remaining directly since there is no more toBeRemovedPartitions
316-
shouldAddDirectly = true;
313+
// this partition is in assignedPartitions, don't add to unassignedPartitions, just get next assigned partition
314+
if (sortedAssignedPartitionsIter.hasNext()) {
315+
nextPartition = sortedAssignedPartitionsIter.next();
316+
} else {
317+
// add the remaining directly since there is no more sortedAssignedPartitions
318+
shouldAddDirectly = true;
319+
}
317320
}
318321
}
319322
}
323+
320324
return unassignedPartitions;
321325
}
322326

323327

324-
private List<TopicPartition> getTopicPartitions(Map<String, Integer> partitionsPerTopic) {
325-
List<TopicPartition> allPartitions = new ArrayList<>(
326-
partitionsPerTopic.values().stream().reduce(0, Integer::sum));
327-
328-
List<String> allTopics = new ArrayList<>(partitionsPerTopic.keySet());
329-
// sort all topics first, then we can have sorted all topic partitions by adding partitions starting from 0
330-
Collections.sort(allTopics);
328+
private List<TopicPartition> getAllTopicPartitions(Map<String, Integer> partitionsPerTopic,
329+
List<String> sortedAllTopics,
330+
int totalPartitionsCount) {
331+
List<TopicPartition> allPartitions = new ArrayList<>(totalPartitionsCount);
331332

332-
for (String topic: allTopics) {
333+
for (String topic : sortedAllTopics) {
333334
int partitionCount = partitionsPerTopic.get(topic);
334335
for (int i = 0; i < partitionCount; ++i) {
335336
allPartitions.add(new TopicPartition(topic, i));

0 commit comments

Comments
 (0)