Skip to content

Commit 37d086f

Browse files
authored
KAFKA-12984: make AbstractStickyAssignor resilient to invalid input, utilize generation in cooperative, and fix assignment bug (#10985)
1) Bring the generation field back to the CooperativeStickyAssignor so we don't need to rely so heavily on the ConsumerCoordinator properly updating its SubscriptionState after eg falling out of the group. The plain StickyAssignor always used the generation since it had to, so we just make sure the CooperativeStickyAssignor has this tool as well 2) In case of unforeseen problems or further bugs that slip past the generation field safety net, the assignor will now explicitly look out for partitions that are being claimed by multiple consumers as owned in the same generation. Such a case should never occur, but if it does, we have to invalidate this partition from the ownedPartitions of both consumers, since we can't tell who, if anyone, has the valid claim to this partition. 3) Fix a subtle bug that I discovered while writing tests for the above two fixes: in the constrained algorithm, we compute the exact number of partitions each consumer should end up with, and keep track of the "unfilled" members who must -- or might -- require more partitions to hit their quota. The problem was that members at the minQuota were being considered as "unfilled" even after we had already hit the maximum number of consumers allowed to go up to the maxQuota, meaning those minQuota members could/should not accept any more partitions beyond that. I believe this was introduced in #10509, so it shouldn't be in any released versions and does not need to be backported. Reviewers: Guozhang Wang <[email protected]>, Luke Chen <[email protected]>
1 parent b80ff18 commit 37d086f

File tree

6 files changed

+341
-80
lines changed

6 files changed

+341
-80
lines changed

checkstyle/suppressions.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
files="AbstractResponse.java"/>
4343

4444
<suppress checks="MethodLength"
45-
files="KerberosLogin.java|RequestResponseTest.java|ConnectMetricsRegistry.java|KafkaConsumer.java"/>
45+
files="(KerberosLogin|RequestResponseTest|ConnectMetricsRegistry|KafkaConsumer|AbstractStickyAssignor).java"/>
4646

4747
<suppress checks="ParameterNumber"
4848
files="(NetworkClient|FieldSpec|KafkaRaftClient).java"/>

clients/src/main/java/org/apache/kafka/clients/consumer/CooperativeStickyAssignor.java

Lines changed: 41 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
*/
1717
package org.apache.kafka.clients.consumer;
1818

19+
import java.nio.ByteBuffer;
1920
import java.util.Arrays;
2021
import java.util.HashMap;
2122
import java.util.HashSet;
@@ -25,6 +26,10 @@
2526
import java.util.Set;
2627
import org.apache.kafka.clients.consumer.internals.AbstractStickyAssignor;
2728
import org.apache.kafka.common.TopicPartition;
29+
import org.apache.kafka.common.protocol.types.Field;
30+
import org.apache.kafka.common.protocol.types.Schema;
31+
import org.apache.kafka.common.protocol.types.Struct;
32+
import org.apache.kafka.common.protocol.types.Type;
2833

2934
/**
3035
* A cooperative version of the {@link AbstractStickyAssignor AbstractStickyAssignor}. This follows the same (sticky)
@@ -43,6 +48,13 @@
4348
*/
4449
public class CooperativeStickyAssignor extends AbstractStickyAssignor {
4550

51+
// these schemas are used for preserving useful metadata for the assignment, such as the last stable generation
52+
private static final String GENERATION_KEY_NAME = "generation";
53+
private static final Schema COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0 = new Schema(
54+
new Field(GENERATION_KEY_NAME, Type.INT32));
55+
56+
private int generation = DEFAULT_GENERATION; // consumer group generation
57+
4658
@Override
4759
public String name() {
4860
return "cooperative-sticky";
@@ -53,9 +65,37 @@ public List<RebalanceProtocol> supportedProtocols() {
5365
return Arrays.asList(RebalanceProtocol.COOPERATIVE, RebalanceProtocol.EAGER);
5466
}
5567

68+
@Override
69+
public void onAssignment(Assignment assignment, ConsumerGroupMetadata metadata) {
70+
this.generation = metadata.generationId();
71+
}
72+
73+
@Override
74+
public ByteBuffer subscriptionUserData(Set<String> topics) {
75+
Struct struct = new Struct(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0);
76+
77+
struct.set(GENERATION_KEY_NAME, generation);
78+
ByteBuffer buffer = ByteBuffer.allocate(COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.sizeOf(struct));
79+
COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.write(buffer, struct);
80+
buffer.flip();
81+
return buffer;
82+
}
83+
5684
@Override
5785
protected MemberData memberData(Subscription subscription) {
58-
return new MemberData(subscription.ownedPartitions(), Optional.empty());
86+
ByteBuffer buffer = subscription.userData();
87+
Optional<Integer> encodedGeneration;
88+
if (buffer == null) {
89+
encodedGeneration = Optional.empty();
90+
} else {
91+
try {
92+
Struct struct = COOPERATIVE_STICKY_ASSIGNOR_USER_DATA_V0.read(buffer);
93+
encodedGeneration = Optional.of(struct.getInt(GENERATION_KEY_NAME));
94+
} catch (Exception e) {
95+
encodedGeneration = Optional.of(DEFAULT_GENERATION);
96+
}
97+
}
98+
return new MemberData(subscription.ownedPartitions(), encodedGeneration);
5999
}
60100

61101
@Override

0 commit comments

Comments
 (0)