29
29
import java .util .Map .Entry ;
30
30
import java .util .Optional ;
31
31
import java .util .Set ;
32
- import java .util .SortedSet ;
33
32
import java .util .TreeMap ;
34
33
import java .util .TreeSet ;
35
34
import java .util .stream .Collectors ;
@@ -165,15 +164,15 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
165
164
partitionsPerTopic , consumerToOwnedPartitions ));
166
165
}
167
166
168
- SortedSet <TopicPartition > unassignedPartitions = getTopicPartitions (partitionsPerTopic );
167
+ List <TopicPartition > sortedAllPartitions = getTopicPartitions (partitionsPerTopic );
169
168
170
169
Set <TopicPartition > allRevokedPartitions = new HashSet <>();
171
170
172
171
// the consumers not yet at expected capacity
173
172
List <String > unfilledMembers = new LinkedList <>();
174
173
175
174
int numberOfConsumers = consumerToOwnedPartitions .size ();
176
- int totalPartitionsCount = unassignedPartitions .size ();
175
+ int totalPartitionsCount = sortedAllPartitions .size ();
177
176
178
177
int minQuota = (int ) Math .floor (((double ) totalPartitionsCount ) / numberOfConsumers );
179
178
int maxQuota = (int ) Math .ceil (((double ) totalPartitionsCount ) / numberOfConsumers );
@@ -186,6 +185,7 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
186
185
Map <String , List <TopicPartition >> assignment = new HashMap <>(
187
186
consumerToOwnedPartitions .keySet ().stream ().collect (Collectors .toMap (c -> c , c -> new ArrayList <>(maxQuota ))));
188
187
188
+ List <TopicPartition > toBeRemovedPartitions = new ArrayList <>();
189
189
// Reassign as many previously owned partitions as possible
190
190
for (Map .Entry <String , List <TopicPartition >> consumerEntry : consumerToOwnedPartitions .entrySet ()) {
191
191
String consumer = consumerEntry .getKey ();
@@ -198,24 +198,35 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
198
198
// and put this member into unfilled member list
199
199
if (ownedPartitions .size () > 0 ) {
200
200
consumerAssignment .addAll (ownedPartitions );
201
- unassignedPartitions . removeAll (ownedPartitions );
201
+ toBeRemovedPartitions . addAll (ownedPartitions );
202
202
}
203
203
unfilledMembers .add (consumer );
204
204
} else if (ownedPartitions .size () >= maxQuota && numMaxCapacityMembers ++ <= numExpectedMaxCapacityMembers ) {
205
205
// consumer owned the "maxQuota" of partitions or more, and we still under the number of expected max capacity members
206
206
// so keep "maxQuota" of the owned partitions, and revoke the rest of the partitions
207
207
consumerAssignment .addAll (ownedPartitions .subList (0 , maxQuota ));
208
- unassignedPartitions . removeAll (ownedPartitions .subList (0 , maxQuota ));
208
+ toBeRemovedPartitions . addAll (ownedPartitions .subList (0 , maxQuota ));
209
209
allRevokedPartitions .addAll (ownedPartitions .subList (maxQuota , ownedPartitions .size ()));
210
210
} else {
211
211
// consumer owned the "minQuota" of partitions or more
212
212
// so keep "minQuota" of the owned partitions, and revoke the rest of the partitions
213
213
consumerAssignment .addAll (ownedPartitions .subList (0 , minQuota ));
214
- unassignedPartitions . removeAll (ownedPartitions .subList (0 , minQuota ));
214
+ toBeRemovedPartitions . addAll (ownedPartitions .subList (0 , minQuota ));
215
215
allRevokedPartitions .addAll (ownedPartitions .subList (minQuota , ownedPartitions .size ()));
216
216
}
217
217
}
218
218
219
+ List <TopicPartition > unassignedPartitions ;
220
+ if (!toBeRemovedPartitions .isEmpty ()) {
221
+ Collections .sort (toBeRemovedPartitions ,
222
+ Comparator .comparing (TopicPartition ::topic ).thenComparing (TopicPartition ::partition ));
223
+ unassignedPartitions = getUnassignedPartitions (sortedAllPartitions , toBeRemovedPartitions );
224
+ sortedAllPartitions = null ;
225
+ } else {
226
+ unassignedPartitions = sortedAllPartitions ;
227
+ }
228
+ toBeRemovedPartitions = null ;
229
+
219
230
if (log .isDebugEnabled ()) {
220
231
log .debug (String .format (
221
232
"After reassigning previously owned partitions, unfilled members: %s, unassigned partitions: %s, " +
@@ -234,23 +245,28 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
234
245
List <TopicPartition > consumerAssignment = assignment .get (consumer );
235
246
236
247
int expectedAssignedCount = numMaxCapacityMembers < numExpectedMaxCapacityMembers ? maxQuota : minQuota ;
248
+ int currentAssignedCount = consumerAssignment .size ();
237
249
if (unassignedPartitionsIter .hasNext ()) {
238
250
TopicPartition tp = unassignedPartitionsIter .next ();
239
251
consumerAssignment .add (tp );
240
- unassignedPartitionsIter . remove () ;
252
+ currentAssignedCount ++ ;
241
253
// We already assigned all possible ownedPartitions, so we know this must be newly to this consumer
242
254
if (allRevokedPartitions .contains (tp ))
243
255
partitionsTransferringOwnership .put (tp , consumer );
244
256
} else {
245
- // Should not enter here since we have calculated the exact number to assign to each consumer
246
- log .warn (String .format (
247
- "No more partitions to be assigned. consumer: [%s] with current size: %d, but expected size is %d" ,
248
- consumer , consumerAssignment .size (), expectedAssignedCount ));
257
+ // This will only happen when current consumer has minQuota of partitions, and in previous round,
258
+ // the expectedAssignedCount is maxQuota, so, still in unfilledMembers list.
259
+ // But now, expectedAssignedCount is minQuota, we can remove it.
260
+ if (currentAssignedCount != minQuota ) {
261
+ // Should not enter here since we have calculated the exact number to assign to each consumer
262
+ log .warn (String .format (
263
+ "No more partitions to be assigned. consumer: [%s] with current size: %d, but expected size is %d" ,
264
+ consumer , currentAssignedCount , expectedAssignedCount ));
265
+ }
266
+ unfilledConsumerIter .remove ();
249
267
break ;
250
268
}
251
269
252
- int currentAssignedCount = consumerAssignment .size ();
253
-
254
270
if (currentAssignedCount == expectedAssignedCount ) {
255
271
if (currentAssignedCount == maxQuota ) {
256
272
numMaxCapacityMembers ++;
@@ -263,16 +279,59 @@ private Map<String, List<TopicPartition>> constrainedAssign(Map<String, Integer>
263
279
if (log .isDebugEnabled ()) {
264
280
log .debug ("final assignment: " + assignment );
265
281
}
266
-
282
+
267
283
return assignment ;
268
284
}
269
285
270
- private SortedSet <TopicPartition > getTopicPartitions (Map <String , Integer > partitionsPerTopic ) {
271
- SortedSet <TopicPartition > allPartitions =
272
- new TreeSet <>(Comparator .comparing (TopicPartition ::topic ).thenComparing (TopicPartition ::partition ));
273
- for (Entry <String , Integer > entry : partitionsPerTopic .entrySet ()) {
274
- String topic = entry .getKey ();
275
- for (int i = 0 ; i < entry .getValue (); ++i ) {
286
+ /**
287
+ * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions)
288
+ * and sortedToBeRemovedPartitions. We use two pointers technique here:
289
+ *
290
+ * We loop the sortedPartition, and compare the ith element in sorted toBeRemovedPartitions(i start from 0):
291
+ * - if not equal to the ith element, add to unassignedPartitions
292
+ * - if equal to the the ith element, get next element from sortedToBeRemovedPartitions
293
+ *
294
+ * @param sortedPartitions: sorted all partitions
295
+ * @param sortedToBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions
296
+ * @return the partitions don't assign to any current consumers
297
+ */
298
+ private List <TopicPartition > getUnassignedPartitions (List <TopicPartition > sortedPartitions ,
299
+ List <TopicPartition > sortedToBeRemovedPartitions ) {
300
+ List <TopicPartition > unassignedPartitions = new ArrayList <>(
301
+ sortedPartitions .size () - sortedToBeRemovedPartitions .size ());
302
+
303
+ int index = 0 ;
304
+ boolean shouldAddDirectly = false ;
305
+ int sizeToBeRemovedPartitions = sortedToBeRemovedPartitions .size ();
306
+ TopicPartition nextPartition = sortedToBeRemovedPartitions .get (index );
307
+ for (TopicPartition topicPartition : sortedPartitions ) {
308
+ if (shouldAddDirectly || !nextPartition .equals (topicPartition )) {
309
+ unassignedPartitions .add (topicPartition );
310
+ } else {
311
+ // equal case, don't add to unassignedPartitions, just get next partition
312
+ if (index < sizeToBeRemovedPartitions - 1 ) {
313
+ nextPartition = sortedToBeRemovedPartitions .get (++index );
314
+ } else {
315
+ // add the remaining directly since there is no more toBeRemovedPartitions
316
+ shouldAddDirectly = true ;
317
+ }
318
+ }
319
+ }
320
+ return unassignedPartitions ;
321
+ }
322
+
323
+
324
+ private List <TopicPartition > getTopicPartitions (Map <String , Integer > partitionsPerTopic ) {
325
+ List <TopicPartition > allPartitions = new ArrayList <>(
326
+ partitionsPerTopic .values ().stream ().reduce (0 , Integer ::sum ));
327
+
328
+ List <String > allTopics = new ArrayList <>(partitionsPerTopic .keySet ());
329
+ // sort all topics first, then we can have sorted all topic partitions by adding partitions starting from 0
330
+ Collections .sort (allTopics );
331
+
332
+ for (String topic : allTopics ) {
333
+ int partitionCount = partitionsPerTopic .get (topic );
334
+ for (int i = 0 ; i < partitionCount ; ++i ) {
276
335
allPartitions .add (new TopicPartition (topic , i ));
277
336
}
278
337
}
0 commit comments