-
Notifications
You must be signed in to change notification settings - Fork 14.4k
KAFKA-12464: enhance constrained sticky Assign algorithm #10509
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
@ableegoldman , please help review this PR. Thank you. |
Map<String, List<TopicPartition>> assignment = new HashMap<>( | ||
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(minQuota)))); | ||
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c -> new ArrayList<>(maxQuota)))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we should make the capacity to maxQuota to avoid memory reallocation.
// the members with exactly maxQuota partitions assigned | ||
Queue<String> maxCapacityMembers = new LinkedList<>(); | ||
// the members with exactly minQuota partitions assigned | ||
Queue<String> minCapacityMembers = new LinkedList<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We don't need to keep the maxCapacityMembers
/minCapacityMembers
anymore because we can precisely know how many members can have max capacity now, by this
int numExpectedMaxCapacityMembers = unassignedPartitions.size() % numberOfConsumers;
} | ||
|
||
// If we ran out of unassigned partitions before filling all consumers, we need to start stealing partitions | ||
// from the over-full consumers at max capacity |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove step 3 and the step 4 below
9330402
to
9abb5aa
Compare
@@ -108,6 +107,8 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, | |||
subscribedTopics.addAll(subscription.topics()); | |||
} else if (!(subscription.topics().size() == subscribedTopics.size() | |||
&& subscribedTopics.containsAll(subscription.topics()))) { | |||
// we don't need consumerToOwnedPartitions in general assign case | |||
consumerToOwnedPartitions = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll separate the checkAllSubscriptionEqual
and getConsumerToOwnedPartitions
methods in KAFKA-12654. Make the consumerToOwnedPartitions
null to free some heap memory.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we set it to null up in #assign
, before invoking generalAssign
? I feel that's slightly more future-proof, as it's easy to miss that this gets cleared when it occurs deep in this boolean check method, in case someone might decide they want to use this map in generalAssign
.
Which could happen since it does build up basically this exact same information in a later loop -- an alternative to nullifying this map we could just pass it in to generalAssign
to replace the currentAssignment
map that gets filled in via prepopulateCurrentAssignments
. That won't save us from looping through all the assignments entirely since we also need to populate prevAssignments
but will still save some time by cutting out the filling in of currentAssignment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll nullify it for now before invoking generalAssign
to make the PR simple. And then adopt your 2nd option in another ticket (KAFKA-12654). Thank you.
@ableegoldman , could you help check this PR? Thank you. |
Hey @showuon , I'm pretty swamped at the moment but don't worry, it's on my list 🙂 Thanks |
@ableegoldman , haha, no problem. I thought you ignore it. Take your time and thank you a lot. :) |
@ableegoldman (cc. @guozhangwang) After the PR: the Let's what the result is in jenkins trunk build. |
|
||
List<String> allTopics = new ArrayList<>(partitionsPerTopic.keySet()); | ||
// sort all topics first, then we can have sorted all topic partitions by adding partitions starting from 0 | ||
Collections.sort(allTopics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adopt the technique of refactor 4 from #10552 (comment)
We used to maintain a SortedSet of the all topic partitions. It takes some time to build the set while adding the partitions.
Improve it by using ArrayList, and sorting all topics first(only 500 topics to sort, compared to the original 1 million partitions to sort), and then add the partitions by looping all sorted topics.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did the math and it seems to come down to roughly O(NlogN) vs O(2logN), which for N = 1 million is a roughly 10x improvement. Not bad, of course it is a tradeoff and there are other factors as mentioned above. But still very nice 👍
* @param sortedToBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions | ||
* @return the partitions don't assign to any current consumers | ||
*/ | ||
private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
adopt the refactor 2 from #10552 (comment).
We used to have an SortedSet of unassignedPartitions, with all partitions (ex: 1 million partitions), and loop through current assignment, to remove already assigned partitions, ex: 999,000 of them, so we'll only have 1000 partitions left. However, SortedSet element removing need some time because it needs to find element first, and then, do some tree node movement to maintain balanced. This situation should happen a lot since each rebalance, we should only have small set of changes (ex: 1 consumer dropped), so this is an important improvement.
To refactor it, I used two pointer technique to loop through 2 sorted list: sortedPartitions and sortedToBeRemovedPartitions. And only add the difference set of the 2 lists. The looping and element adding is very fast in ArrayList. So, it improves a lot.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be clear, a removeAll(M partitions)
operation on TreeSet should still be only O(M*logN) since each individual remove is only logN. Even for N = 1 million, logN is under 20. So it scales more with how many partitions are being removed.
I tried to do the math here and found the time complexity of the original to be (slightly) better on paper, but maybe I missed something or the reality is just different for certain input parameters (bigO time is not an absolute law after all 🙂 ) Maybe you can check my work and then run some tests with and without this specific change (but with all other improvements included).
Let's say the number of consumers is C, the number of partitions assigned to each consumer is M, and the total number of partitions is N
Before:
Loop through all consumers and call (TreeSet) unassignedPartitions.removeAll(assignedPartitions). This is C * M * logN
(where N will actually decrease down to ~0 by the end of the loop since as you pointed out, most partitions should be reassigned in the sticky assignor)
After:
Loop through all consumers and call (ArrayList) toBeRemovedPartitions.addAll(assignedPartitions). Since addAll has to copy all M elements, this is C * M. After that we call sort(toBeRemovedPartitions), where toBeRemovedPartitions = C * M, so this is C * M * log(C * M). And then finally we call getUnassignedPartitions, which is O(N). So putting it altogether we have C * M + C * M * log(C*M) + N
It's a little hard to compare these directly but in general we'll have C * M ≈ N (since most partitions are reassigned) in which case the before vs after runtimes can be reduced to O(N2log(N)) vs O(2N + N2log(N)). The before case is actually better, although they're roughly the same for large N. If this analysis holds up in your experiments then we should just go with whichever one uses less memory and/or has the simplest code. In my (admittedly biased) opinion the original method was the easiest to understand in the code, and objectively speaking it also used less memory since we only need the one unassignedPartitions
data structure. ie we can just rename sortedAllPartitions
back to unassignedPartitions
(or maybe partitionsToBeAssigned is better?) and then we can delete both the partitionsToBeRemoved
and the second unassignedPartitions
. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course we also need to take into account that going back to the TreeSet will mean we lose the improvement in getTopicPartitions
as well. I did the math there and found it came down to O(NlogN) vs O(2N). And since logN grows so quickly this is like a 10x improvement for 1 million partitions. Adding this all up, the new algorithm does come out slightly on top now:
Before we had O(NlogN + N2log(N)) and now we have O(4N + NlogN + N2log(N)). Taking logN ≈ 20 means a 5x improvement for the part that scales as N.
-- which is great, obviously, but the time for getTopicPartitions
is negligible compared to the time complexity of this loop we discussed above, ie the N2term is going to dominate the N term. So we have a tradeoff here between an unknown but possibly small performance improvement that uses a lot more memory, and a somewhat worse algorithm with only the one data structure and slightly cleaner code.
That's a hard tradeoff to comment on without more data. If you could re-run your experiments with all other improvements implemented but without these two things (ie the use of toBeRemovedPartitions instead of dynamically removing from
unassignedPartitions, and going back to TreeSet in
getTopicPartitions`) then we can see what the actual performance characteristics are and also get a sense of how much extra memory we're talking. I'd recommend first addressing all my other comments here and re-running with/without this stuff in case those other comments have an impact, so we can be fair to both sides.
Anyways that's my analysis here, but you've been looking at this more carefully, more recently so it's possible I'm missing something here. Interested to hear your take on this
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In my (admittedly biased) opinion the original method was the easiest to understand in the code, and objectively speaking it also used less memory
I like your honesty, haha! And yes, I agree with you.
The timing for getTopicPartitions
and remove assigned partitions
is here:
before:
[2021-04-28 17:39:10,260] ERROR getTopicPartitions took:695
[2021-04-28 17:39:10,266] ERROR remove assigned took:1
[2021-04-28 17:39:11,594] ERROR getTopicPartitions took:607
[2021-04-28 17:39:12,889] ERROR remove assigned took:1291
after
[2021-04-28 17:36:02,701] ERROR getTopicPartitions took:41
[2021-04-28 17:36:02,707] ERROR remove assigned took:1
[2021-04-28 17:36:03,290] ERROR getTopicPartitions took:15
[2021-04-28 17:36:04,204] ERROR remove assigned took:910
So, basically, your calculation is correct. The getTopicPartitions
method has 10x (or even more) faster, and the removed assigned partitions
takes almost the same time, but after is still faster. (explained below)
I agree it is bad that after needs a lot more memory compared with before. The more memory usage is due to the additional toBeRemovedPartitions
created. I can reduce the memory usage by not creating the sortedAllPartitions
since I only need it in getUnassignedPartitions
. In getUnassignedPartitions
, before, I was looping 2 partitions (sortedAllPartitions
& sortedToBeRemovedPartitions
), now, without sortedAllPartitions
, I can loop through sortedAllTopics
like in getTopicPartitions
, something like this:
List<String> allTopics = new ArrayList<>(partitionsPerTopic.keySet());
Collections.sort(allTopics);
for (String topic: allTopics) {
int partitionCount = partitionsPerTopic.get(topic);
for (int i = 0; i < partitionCount; i++) {
if (!(nextPartition.topic().equals(topic) && nextPartition.partition() == i)) {
...
}
}
}
This way, before and after will have pretty much the same memory usage. And the time spent for after is basically the same (without getTopicPartitions
):
[2021-04-28 17:48:21,694] ERROR remove assigned took:46
[2021-04-28 17:48:23,405] ERROR remove assigned took:1035
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the removed assigned partitions takes almost the same time, but after is still faster. (explained below)
I think the reason is:
sort(toBeRemovedPartitions)
isC * M * log(C * M)
, that's correct in theory! But in our assignment case, usually, the consumer assignment is paritition-sorted (due to the consumer leader create the assignment in sorted order). So, based on the java doc inList.sort
:
This implementation is a stable, adaptive, iterative mergesort that requires far fewer than n lg(n) comparisons when the input array is partially sorted, while offering the performance of a traditional mergesort when the input array is randomly ordered. If the input array is nearly sorted, the implementation requires approximately n comparisons.
That means, the sort should be faster than we thought.
- The nature of
ArrayList
is a continuous memory, which is cache-friendly. That is, when retrieving data, we can get their neighbor data at the same time, it improves the data retrieval from memory. So, this nature makes the ArrayListiteration
andelements addition
(we can copy a M continuous memory into another memory directly if size is enough, it could be O(1)) faster than TreeSet.
// ArrayList.addAll()
public boolean addAll(Collection<? extends E> c) {
...
if (numNew > (elementData = this.elementData).length - (s = size))
elementData = grow(s + numNew);
System.arraycopy(a, 0, elementData, s, numNew);
size = s + numNew;
return true;
}
I don't know if there are other potential reason, but the performance is indeed improved by ArrayList
.
an unknown but possibly small performance improvement that uses a lot more memory, and a somewhat worse algorithm with only the one data structure and slightly cleaner code.
If the memory usage for after is a lot higher than the before, I'd hesitate to say after is better. But now, with my above improvement, the memory usage is basically the same as before. I'd choose my change to make the assignor faster. After all, faster assignor, faster rebalance.
That's my 2 cents.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, what we are discussing is not 5% or 10% improvement. It's almost 2x faster in my mac, and in jenkins, it is down from 5 sec to 2 sec , 60% improvement. FYI
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for getting some concrete numbers to work with! I suspected the theory would not match the reality due to caching primarily, although I wasn't aware of the improved runtime of sort on a partially-ordered list. That's good to know 😄 And it does make sense in hindsight given the nature of the sorting algorithm.
I've always found that the reality of array performance with any reasonable caching architecture, compared to theoretically better data structures/algorithms is one of those things that people know and still subconsciously doubt. Probably because most people spent 4 years in college getting theoretical algorithmic runtimes drilled into their heads, and far less time looking into the underlying architecture that powers those algorithms and applies its own optimizations under the hood. It's an interesting psychological observation. There's a great talk on it somewhere but I can't remember the name
Anyways, you just never know until you run the numbers. I'm sure this may vary somewhat with different input parameters but I think I'm convinced, let's stick with this improvement. If someone starts complaining about the memory consumption we can always go back and look for ways to cut down. Thanks for the enlightening discussion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just let me know when you've responded to my other comments and this is ready for another pass. Nice to see such an improvement along with making this the default
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thank you very much, @ableegoldman ! I'll address your comments today, thank you :)
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Show resolved
Hide resolved
jenkins test results after my change: (avg. 2 sec)
the latest trunk test results: (avg. 5.4 sec)
BTW, all failed tests are flaky and unrelated:
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @showuon , this is pretty cool. I do think there's some more nuance to one of the improvements but everything else looks great. I'll apologize in advance for the length of my comments on that one thing 😅
@@ -108,6 +107,8 @@ private boolean allSubscriptionsEqual(Set<String> allTopics, | |||
subscribedTopics.addAll(subscription.topics()); | |||
} else if (!(subscription.topics().size() == subscribedTopics.size() | |||
&& subscribedTopics.containsAll(subscription.topics()))) { | |||
// we don't need consumerToOwnedPartitions in general assign case | |||
consumerToOwnedPartitions = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we set it to null up in #assign
, before invoking generalAssign
? I feel that's slightly more future-proof, as it's easy to miss that this gets cleared when it occurs deep in this boolean check method, in case someone might decide they want to use this map in generalAssign
.
Which could happen since it does build up basically this exact same information in a later loop -- an alternative to nullifying this map we could just pass it in to generalAssign
to replace the currentAssignment
map that gets filled in via prepopulateCurrentAssignments
. That won't save us from looping through all the assignments entirely since we also need to populate prevAssignments
but will still save some time by cutting out the filling in of currentAssignment
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
|
||
List<String> allTopics = new ArrayList<>(partitionsPerTopic.keySet()); | ||
// sort all topics first, then we can have sorted all topic partitions by adding partitions starting from 0 | ||
Collections.sort(allTopics); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I did the math and it seems to come down to roughly O(NlogN) vs O(2logN), which for N = 1 million is a roughly 10x improvement. Not bad, of course it is a tradeoff and there are other factors as mentioned above. But still very nice 👍
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
* @param sortedToBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions | ||
* @return the partitions don't assign to any current consumers | ||
*/ | ||
private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just to be clear, a removeAll(M partitions)
operation on TreeSet should still be only O(M*logN) since each individual remove is only logN. Even for N = 1 million, logN is under 20. So it scales more with how many partitions are being removed.
I tried to do the math here and found the time complexity of the original to be (slightly) better on paper, but maybe I missed something or the reality is just different for certain input parameters (bigO time is not an absolute law after all 🙂 ) Maybe you can check my work and then run some tests with and without this specific change (but with all other improvements included).
Let's say the number of consumers is C, the number of partitions assigned to each consumer is M, and the total number of partitions is N
Before:
Loop through all consumers and call (TreeSet) unassignedPartitions.removeAll(assignedPartitions). This is C * M * logN
(where N will actually decrease down to ~0 by the end of the loop since as you pointed out, most partitions should be reassigned in the sticky assignor)
After:
Loop through all consumers and call (ArrayList) toBeRemovedPartitions.addAll(assignedPartitions). Since addAll has to copy all M elements, this is C * M. After that we call sort(toBeRemovedPartitions), where toBeRemovedPartitions = C * M, so this is C * M * log(C * M). And then finally we call getUnassignedPartitions, which is O(N). So putting it altogether we have C * M + C * M * log(C*M) + N
It's a little hard to compare these directly but in general we'll have C * M ≈ N (since most partitions are reassigned) in which case the before vs after runtimes can be reduced to O(N2log(N)) vs O(2N + N2log(N)). The before case is actually better, although they're roughly the same for large N. If this analysis holds up in your experiments then we should just go with whichever one uses less memory and/or has the simplest code. In my (admittedly biased) opinion the original method was the easiest to understand in the code, and objectively speaking it also used less memory since we only need the one unassignedPartitions
data structure. ie we can just rename sortedAllPartitions
back to unassignedPartitions
(or maybe partitionsToBeAssigned is better?) and then we can delete both the partitionsToBeRemoved
and the second unassignedPartitions
. Thoughts?
* @param sortedToBeRemovedPartitions: sorted partitions, all are included in the sortedPartitions | ||
* @return the partitions don't assign to any current consumers | ||
*/ | ||
private List<TopicPartition> getUnassignedPartitions(List<TopicPartition> sortedPartitions, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Of course we also need to take into account that going back to the TreeSet will mean we lose the improvement in getTopicPartitions
as well. I did the math there and found it came down to O(NlogN) vs O(2N). And since logN grows so quickly this is like a 10x improvement for 1 million partitions. Adding this all up, the new algorithm does come out slightly on top now:
Before we had O(NlogN + N2log(N)) and now we have O(4N + NlogN + N2log(N)). Taking logN ≈ 20 means a 5x improvement for the part that scales as N.
-- which is great, obviously, but the time for getTopicPartitions
is negligible compared to the time complexity of this loop we discussed above, ie the N2term is going to dominate the N term. So we have a tradeoff here between an unknown but possibly small performance improvement that uses a lot more memory, and a somewhat worse algorithm with only the one data structure and slightly cleaner code.
That's a hard tradeoff to comment on without more data. If you could re-run your experiments with all other improvements implemented but without these two things (ie the use of toBeRemovedPartitions instead of dynamically removing from
unassignedPartitions, and going back to TreeSet in
getTopicPartitions`) then we can see what the actual performance characteristics are and also get a sense of how much extra memory we're talking. I'd recommend first addressing all my other comments here and re-running with/without this stuff in case those other comments have an impact, so we can be fair to both sides.
Anyways that's my analysis here, but you've been looking at this more carefully, more recently so it's possible I'm missing something here. Interested to hear your take on this
@ableegoldman , thanks for your great comments! If there's a best reviewer contest, I'd definitely vote for you! :) I've replied your Simply put, I have a further improvement to keep pretty much the same memory usage as before, and still have the performance improvement. I think we should keep this change. :) I'd like to see if you have any thoughts. :) |
} | ||
|
||
|
||
private List<TopicPartition> getAllTopicPartitions(Map<String, Integer> partitionsPerTopic) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename the method to getAllTopicPartitions
to make it more clear.
fc22c0e
to
f4d0408
Compare
@ableegoldman , I've addressed all your comments, and have some code refactor. What I did are:
Please help review again. Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looking good! A few suggestions but mostly minor stuff. Can you also take a look at AbstractStickyAssignorTest and see if you feel the test coverage is sufficient to feel confident in this change, or whether we might want to expand it for the constrained case a bit more?
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Show resolved
Hide resolved
minCapacityMembers.add(consumer); | ||
if (consumerAssignment.size() == maxQuota) | ||
maxCapacityMembers.add(consumer); | ||
// consumer owned at least "minQuota" of partitions but we're already at the allowed number of max capacity members |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I wonder if we need to handle one more case? I'm thinking about where the consumer had exactly minQuota
previously owned partitions but we're not yet at numExpectedMaxCapacityMembers
. Should we consider the consumer to be "unfilled" since it may be that this consumer needs to eventually get to maxQuota
?
I tried to think up an example where this current logic would break, and I couldn't. It's possible that there isn't one just by mathematical coincidence -- eg if there's just one consumer which had minQuota
previously owned partitions then in theory it might need to have one more assigned, but in reality if there's only one consumer then min and max quota will always be equal. Just wondering if you could give this some thought and make sure we're not missing something here.
(Even if not, we should update the comment here since it's a bit misleading -- we don't know that we're at the allowed max capacity members yet. Maybe we can instead leave a comment explaining why we think the above case is not possible, assuming you do come to that same conclusion)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You found a potential bug here!!! You're awesome!!! It's great we found and fix it here, not releasing it to the users! So, again, it implied the tests didn't cover all cases! I'll add more tests for it. Thank you!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is actually possible even with more than one consumer, since in we can have both num.partitions AND num.consumers change. When that happens, relying on prev.assigned tasks would not guarantee that stickiness would be just sufficient to achieve the end goal.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: also in practice, I think the common case would be minQuota == maxQuota
and numExpectedMaxCapacityMembers == 0
. So the name numExpectedMaxCapacityMembers
here could be a bit misleading. Maybe rename to expectedNumMembersHavingMorePartitions
and numMembersHavingMorePartitions
instead?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, this could happen, and just updated. Also the renaming is better. Thanks.
if (!unfilledConsumerIter.hasNext()) { | ||
if (unfilledMembers.isEmpty()) { | ||
// Should not enter here since we have calculated the exact number to assign to each consumer | ||
log.warn("No more unfilled consumers to be assigned."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should take this case more seriously, because it will mean some partitions don't get assigned. Either we log it as error and throw an exception, or try to handle it gracefully by continuing through the loop to get all remaining unassigned partitions and then distributing them out to whoever has the fewest assigned partitions. Imo we should just throw an exception since hitting this might mean there's a more critical bug in the algorithm, at which point who knows if there are other effects like assigning the same partition to two owners, etc
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great point! You always thinks far ahead than I did!!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should never happen either --- so if it does, we can treat it as a bug and throw illegal state exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree!
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
@ableegoldman , I've addressed all your comments, but I'd like to spend more time to add more tests to make me feel confident in this change. I'll let you know when completed. Thank you. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Made a pass too, thanks @showuon for the great improvement!
* from the over-full consumers at max capacity | ||
* 4. Otherwise we may have run out of unfilled consumers before assigning all partitions, in which case we | ||
* should just distribute one partition each to all consumers at min capacity | ||
* 1. Reassign as many previously owned partitions as possible, up to the expected number of maxQuota, otherwise, minQuota |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the first line seems should just be up to the expected number of maxQuota
? Also the second line seems incomplete: we need to explain what condition is otherwise
on.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nice catch! I put all possibilities in the java doc. Thanks.
minCapacityMembers.add(consumer); | ||
if (consumerAssignment.size() == maxQuota) | ||
maxCapacityMembers.add(consumer); | ||
// consumer owned at least "minQuota" of partitions but we're already at the allowed number of max capacity members |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is actually possible even with more than one consumer, since in we can have both num.partitions AND num.consumers change. When that happens, relying on prev.assigned tasks would not guarantee that stickiness would be just sufficient to achieve the end goal.
if (!unfilledConsumerIter.hasNext()) { | ||
if (unfilledMembers.isEmpty()) { | ||
// Should not enter here since we have calculated the exact number to assign to each consumer | ||
log.warn("No more unfilled consumers to be assigned."); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should never happen either --- so if it does, we can treat it as a bug and throw illegal state exception.
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
Outdated
Show resolved
Hide resolved
@ableegoldman @guozhangwang , thanks for your good comments. I've addressed them in this commit: 868aaf4. Thank you very much. |
@showuon it looks like there are some related test failures, can you look into those? |
… cooperative protocol
Ah! thanks for the jenkins catching this! We should |
Failed test is flaky and un-related:
|
LGTM. Leaving to @ableegoldman for a final look. |
int minQuota = (int) Math.floor(((double) totalPartitionsCount) / numberOfConsumers); | ||
int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / numberOfConsumers); | ||
// the expected number of members with maxQuota assignment | ||
int expectedNumMembersHavingMorePartitions = totalPartitionsCount % numberOfConsumers; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a nit -- and to clarify up front, if you agree with this let's still hold off on doing it here so this PR can finally be merged, as I figure any nits can be addressed in your general assign PR:
It's still a bit unclear what this value will be sued for when you first see it, maybe we can work in the word minQuota
somewhere in the name? Eg expectedNumMembersWithMoreThanMinQuotaPartitions
, or for a slightly shorter example numConsumersAssignedOverMinQuota
, or something between or similar to those
FYI I'm also ok with it as-is if you prefer the current name -- just wanted to throw out some other suggestions. I'll trust you to pick whatever name feels right 🙂
consumerAssignment.addAll(minQuotaPartitions); | ||
assignedPartitions.addAll(minQuotaPartitions); | ||
allRevokedPartitions.addAll(ownedPartitions.subList(minQuota, ownedPartitions.size())); | ||
// this consumer is potential maxQuota candidate since we're still under the number of expected max capacity members |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit (again, please address this in the other PR so I can merge this one): as Guozhang pointed out in another comment, in the case minQuota == maxQuota, this comment is a bit misleading as the number of expected max capacity members is technically all of them, but the variable expectedNumMembersHavingMorePartitions
refers to the number of members who have more than the minQuota number of partitions, which in that case would actually be zero.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe something like
// this consumer is potential maxQuota candidate since we're still under the number of expected max capacity members | |
// since we're still under the number of expected members with more than the minQuota partitions, this consumer may be assigned one more partition |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Just a thought: technically it's not even a "potential maxQuota" member, since as you pointed out in another comment "the unassignedPartitions size will always >= unfilledMembers size" -- therefore anything in unfilledMembers
will in fact need to receive at least one partition. Does that sound right to you? (this is just a followup question to make sure we're on the same page, no need to do anything for this one)
} | ||
} | ||
|
||
List<TopicPartition> unassignedPartitions = getUnassignedPartitions(totalPartitionsCount, partitionsPerTopic, assignedPartitions); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for cleaning this part of the logic up, it's much clearer now (not to mention the nice savings memory-wise)
if (!unfilledConsumerIter.hasNext()) { | ||
if (unfilledMembers.isEmpty()) { | ||
// Should not enter here since we have calculated the exact number to assign to each consumer | ||
// There might be issues in the assigning algorithm, or maybe assigning the same partition to two owners. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit (for next PR): can you log an error before throwing the exception and include the set of unassigned partitions? Either just print out the unassignedPartitions
along with the current partition being processed so you can figure out which partitions are remaining after that, or else by actually computing the remaining partitions that have yet to be assigned. Since it's an error case, I think it's ok to spend a little extra time computing that for better debuggability
// we expected all the remaining unfilled members have minQuota partitions and we're already at the allowed number | ||
// of max capacity members. Otherwise, there must be error here. | ||
if (numMembersHavingMorePartitions != expectedNumMembersHavingMorePartitions) { | ||
throw new IllegalStateException(String.format("We haven't reached the allowed number of max capacity members, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: same here, can you log an error with the remaining unfilledMembers
? I know you already do that in the exception message, but imo it would be better to print in a log message instead of an exception, as it may be long
for (String unfilledMember : unfilledMembers) { | ||
int assignedPartitionsCount = assignment.get(unfilledMember).size(); | ||
if (assignedPartitionsCount != minQuota) { | ||
throw new IllegalStateException(String.format("Consumer: [%s] should have %d partitions, but got %d partitions, " + |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: the exception here looks good, but once again let's also log an error (it just makes it easier to debug when you have something concrete in the place you encountered the error, whereas exceptions are not always printed right away). Should probably just log any info that could be useful, such as all remaining unfilledMembers
} else { | ||
for (String unfilledMember : unfilledMembers) { | ||
int assignedPartitionsCount = assignment.get(unfilledMember).size(); | ||
if (assignedPartitionsCount != minQuota) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: can we add an else
case that just logs that we skipped over this member because we reached max capacity and it was still at min? Not sure if debug or trace is more appropriate, might be worth just running the tests with this log in place to see how often it gets printed
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks @showuon for this awesome improvement -- performance and memory aside, it feels much cleaner and easier to understand now. I admit when I wrote this initially I was pretty much just converting my own thoughts into code, and over-compensating with comments to help readers follow along. I'm glad you came back for a second pass since I never had time to revisit and make it more intelligible.
FYI I left a handful of nits, but I'd like to just go ahead and merge this PR now while the build has (mostly) passed. Can you address them either in the general assign PR or else just a quick followup PR?
Very much looking forward to seeing the general assign receive this treatment as well 😄
Thank you very much, @ableegoldman ! Let's make it better together! :) |
… sticky assignment (#10645) This is the follow up PR to address the remaining comments in #10509. Reviewers: Anna Sophie Blee-Goldman <[email protected]>
…utilize generation in cooperative, and fix assignment bug (#10985) 1) Bring the generation field back to the CooperativeStickyAssignor so we don't need to rely so heavily on the ConsumerCoordinator properly updating its SubscriptionState after eg falling out of the group. The plain StickyAssignor always used the generation since it had to, so we just make sure the CooperativeStickyAssignor has this tool as well 2) In case of unforeseen problems or further bugs that slip past the generation field safety net, the assignor will now explicitly look out for partitions that are being claimed by multiple consumers as owned in the same generation. Such a case should never occur, but if it does, we have to invalidate this partition from the ownedPartitions of both consumers, since we can't tell who, if anyone, has the valid claim to this partition. 3) Fix a subtle bug that I discovered while writing tests for the above two fixes: in the constrained algorithm, we compute the exact number of partitions each consumer should end up with, and keep track of the "unfilled" members who must -- or might -- require more partitions to hit their quota. The problem was that members at the minQuota were being considered as "unfilled" even after we had already hit the maximum number of consumers allowed to go up to the maxQuota, meaning those minQuota members could/should not accept any more partitions beyond that. I believe this was introduced in #10509, so it shouldn't be in any released versions and does not need to be backported. Reviewers: Guozhang Wang <[email protected]>, Luke Chen <[email protected]>
…utilize generation in cooperative, and fix assignment bug (#10985) 1) Bring the generation field back to the CooperativeStickyAssignor so we don't need to rely so heavily on the ConsumerCoordinator properly updating its SubscriptionState after eg falling out of the group. The plain StickyAssignor always used the generation since it had to, so we just make sure the CooperativeStickyAssignor has this tool as well 2) In case of unforeseen problems or further bugs that slip past the generation field safety net, the assignor will now explicitly look out for partitions that are being claimed by multiple consumers as owned in the same generation. Such a case should never occur, but if it does, we have to invalidate this partition from the ownedPartitions of both consumers, since we can't tell who, if anyone, has the valid claim to this partition. 3) Fix a subtle bug that I discovered while writing tests for the above two fixes: in the constrained algorithm, we compute the exact number of partitions each consumer should end up with, and keep track of the "unfilled" members who must -- or might -- require more partitions to hit their quota. The problem was that members at the minQuota were being considered as "unfilled" even after we had already hit the maximum number of consumers allowed to go up to the maxQuota, meaning those minQuota members could/should not accept any more partitions beyond that. I believe this was introduced in #10509, so it shouldn't be in any released versions and does not need to be backported. Reviewers: Guozhang Wang <[email protected]>, Luke Chen <[email protected]>
…utilize generation in cooperative, and fix assignment bug (apache#10985) 1) Bring the generation field back to the CooperativeStickyAssignor so we don't need to rely so heavily on the ConsumerCoordinator properly updating its SubscriptionState after eg falling out of the group. The plain StickyAssignor always used the generation since it had to, so we just make sure the CooperativeStickyAssignor has this tool as well 2) In case of unforeseen problems or further bugs that slip past the generation field safety net, the assignor will now explicitly look out for partitions that are being claimed by multiple consumers as owned in the same generation. Such a case should never occur, but if it does, we have to invalidate this partition from the ownedPartitions of both consumers, since we can't tell who, if anyone, has the valid claim to this partition. 3) Fix a subtle bug that I discovered while writing tests for the above two fixes: in the constrained algorithm, we compute the exact number of partitions each consumer should end up with, and keep track of the "unfilled" members who must -- or might -- require more partitions to hit their quota. The problem was that members at the minQuota were being considered as "unfilled" even after we had already hit the maximum number of consumers allowed to go up to the maxQuota, meaning those minQuota members could/should not accept any more partitions beyond that. I believe this was introduced in apache#10509, so it shouldn't be in any released versions and does not need to be backported. Reviewers: Guozhang Wang <[email protected]>, Luke Chen <[email protected]>
JIRA: https://issues.apache.org/jira/browse/KAFKA-12464
This improvement achieved:
testLargeAssignmentAndGroupWithUniformSubscription
(1 million partitions) will run from ~2600 ms down to ~1400 ms, improves 46% of performance, almost 2x faster!!jenkins test results after my change: (avg. 2 sec)
the latest trunk test results: (avg. 5.4 sec)
Committer Checklist (excluded from commit message)