Skip to content

Commit 2f9c2dd

Browse files
KAFKA-16718-3/n: Added the ShareGroupStatePartitionMetadata record during deletion of share group offsets (#19478)
This is a follow up PR for implementation of DeleteShareGroupOffsets RPC. This PR adds the ShareGroupStatePartitionMetadata record to __consumer__offsets topic to make sure the topic is removed from the initializedTopics list. This PR also removes partitions from the request and response schemas for DeleteShareGroupState RPC Reviewers: Sushant Mahajan <[email protected]>, Andrew Schofield <[email protected]>
1 parent 6462f7a commit 2f9c2dd

File tree

21 files changed

+1342
-604
lines changed

21 files changed

+1342
-604
lines changed

clients/src/main/java/org/apache/kafka/clients/admin/Admin.java

+7-7
Original file line numberDiff line numberDiff line change
@@ -1951,28 +1951,28 @@ default ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareG
19511951
}
19521952

19531953
/**
1954-
* Delete offsets for a set of partitions in a share group.
1954+
* Delete offsets for a set of topics in a share group.
19551955
*
19561956
* @param groupId The group for which to delete offsets.
1957-
* @param partitions The topic-partitions.
1957+
* @param topics The topics for which to delete offsets.
19581958
* @param options The options to use when deleting offsets in a share group.
19591959
* @return The DeleteShareGroupOffsetsResult.
19601960
*/
1961-
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options);
1961+
DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options);
19621962

19631963
/**
1964-
* Delete offsets for a set of partitions in a share group with the default options.
1964+
* Delete offsets for a set of topics in a share group with the default options.
19651965
*
19661966
* <p>
19671967
* This is a convenience method for {@link #deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} with default options.
19681968
* See the overload for more details.
19691969
*
19701970
* @param groupId The group for which to delete offsets.
1971-
* @param partitions The topic-partitions.
1971+
* @param topics The topics for which to delete offsets.
19721972
* @return The DeleteShareGroupOffsetsResult.
19731973
*/
1974-
default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions) {
1975-
return deleteShareGroupOffsets(groupId, partitions, new DeleteShareGroupOffsetsOptions());
1974+
default DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics) {
1975+
return deleteShareGroupOffsets(groupId, topics, new DeleteShareGroupOffsetsOptions());
19761976
}
19771977

19781978
/**

clients/src/main/java/org/apache/kafka/clients/admin/DeleteShareGroupOffsetsResult.java

+19-20
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
package org.apache.kafka.clients.admin;
1818

1919
import org.apache.kafka.common.KafkaFuture;
20-
import org.apache.kafka.common.TopicPartition;
2120
import org.apache.kafka.common.annotation.InterfaceStability;
2221
import org.apache.kafka.common.errors.ApiException;
2322
import org.apache.kafka.common.internals.KafkaFutureImpl;
@@ -33,27 +32,27 @@
3332
@InterfaceStability.Evolving
3433
public class DeleteShareGroupOffsetsResult {
3534

36-
private final KafkaFuture<Map<TopicPartition, ApiException>> future;
37-
private final Set<TopicPartition> partitions;
35+
private final KafkaFuture<Map<String, ApiException>> future;
36+
private final Set<String> topics;
3837

39-
DeleteShareGroupOffsetsResult(KafkaFuture<Map<TopicPartition, ApiException>> future, Set<TopicPartition> partitions) {
38+
DeleteShareGroupOffsetsResult(KafkaFuture<Map<String, ApiException>> future, Set<String> topics) {
4039
this.future = future;
41-
this.partitions = partitions;
40+
this.topics = topics;
4241
}
4342

4443
/**
4544
* Return a future which succeeds only if all the deletions succeed.
46-
* If not, the first partition error shall be returned.
45+
* If not, the first topic error shall be returned.
4746
*/
4847
public KafkaFuture<Void> all() {
4948
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
5049

51-
this.future.whenComplete((topicPartitions, throwable) -> {
50+
this.future.whenComplete((topicResults, throwable) -> {
5251
if (throwable != null) {
5352
result.completeExceptionally(throwable);
5453
} else {
55-
for (TopicPartition partition : partitions) {
56-
if (maybeCompleteExceptionally(topicPartitions, partition, result)) {
54+
for (String topic : topics) {
55+
if (maybeCompleteExceptionally(topicResults, topic, result)) {
5756
return;
5857
}
5958
}
@@ -64,32 +63,32 @@ public KafkaFuture<Void> all() {
6463
}
6564

6665
/**
67-
* Return a future which can be used to check the result for a given partition.
66+
* Return a future which can be used to check the result for a given topic.
6867
*/
69-
public KafkaFuture<Void> partitionResult(final TopicPartition partition) {
70-
if (!partitions.contains(partition)) {
71-
throw new IllegalArgumentException("Partition " + partition + " was not included in the original request");
68+
public KafkaFuture<Void> topicResult(final String topic) {
69+
if (!topics.contains(topic)) {
70+
throw new IllegalArgumentException("Topic " + topic + " was not included in the original request");
7271
}
7372
final KafkaFutureImpl<Void> result = new KafkaFutureImpl<>();
7473

75-
this.future.whenComplete((topicPartitions, throwable) -> {
74+
this.future.whenComplete((topicResults, throwable) -> {
7675
if (throwable != null) {
7776
result.completeExceptionally(throwable);
78-
} else if (!maybeCompleteExceptionally(topicPartitions, partition, result)) {
77+
} else if (!maybeCompleteExceptionally(topicResults, topic, result)) {
7978
result.complete(null);
8079
}
8180
});
8281
return result;
8382
}
8483

85-
private boolean maybeCompleteExceptionally(Map<TopicPartition, ApiException> partitionLevelErrors,
86-
TopicPartition partition,
84+
private boolean maybeCompleteExceptionally(Map<String, ApiException> topicLevelErrors,
85+
String topic,
8786
KafkaFutureImpl<Void> result) {
8887
Throwable exception;
89-
if (!partitionLevelErrors.containsKey(partition)) {
90-
exception = new IllegalArgumentException("Offset deletion result for partition \"" + partition + "\" was not included in the response");
88+
if (!topicLevelErrors.containsKey(topic)) {
89+
exception = new IllegalArgumentException("Offset deletion result for topic \"" + topic + "\" was not included in the response");
9190
} else {
92-
exception = partitionLevelErrors.get(partition);
91+
exception = topicLevelErrors.get(topic);
9392
}
9493

9594
if (exception != null) {

clients/src/main/java/org/apache/kafka/clients/admin/ForwardingAdmin.java

+2-2
Original file line numberDiff line numberDiff line change
@@ -336,8 +336,8 @@ public ListShareGroupOffsetsResult listShareGroupOffsets(Map<String, ListShareGr
336336
}
337337

338338
@Override
339-
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) {
340-
return delegate.deleteShareGroupOffsets(groupId, partitions, options);
339+
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options) {
340+
return delegate.deleteShareGroupOffsets(groupId, topics, options);
341341
}
342342

343343
@Override

clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

+4-4
Original file line numberDiff line numberDiff line change
@@ -3846,11 +3846,11 @@ public ListShareGroupOffsetsResult listShareGroupOffsets(final Map<String, ListS
38463846
}
38473847

38483848
@Override
3849-
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<TopicPartition> partitions, DeleteShareGroupOffsetsOptions options) {
3850-
SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, ApiException>> future = DeleteShareGroupOffsetsHandler.newFuture(groupId);
3851-
DeleteShareGroupOffsetsHandler handler = new DeleteShareGroupOffsetsHandler(groupId, partitions, logContext);
3849+
public DeleteShareGroupOffsetsResult deleteShareGroupOffsets(String groupId, Set<String> topics, DeleteShareGroupOffsetsOptions options) {
3850+
SimpleAdminApiFuture<CoordinatorKey, Map<String, ApiException>> future = DeleteShareGroupOffsetsHandler.newFuture(groupId);
3851+
DeleteShareGroupOffsetsHandler handler = new DeleteShareGroupOffsetsHandler(groupId, topics, logContext);
38523852
invokeDriver(handler, future, options.timeoutMs);
3853-
return new DeleteShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), partitions);
3853+
return new DeleteShareGroupOffsetsResult(future.get(CoordinatorKey.byGroupId(groupId)), topics);
38543854
}
38553855

38563856
@Override

clients/src/main/java/org/apache/kafka/clients/admin/internals/DeleteShareGroupOffsetsHandler.java

+24-32
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@
1919
import org.apache.kafka.clients.admin.DeleteShareGroupOffsetsOptions;
2020
import org.apache.kafka.clients.admin.KafkaAdminClient;
2121
import org.apache.kafka.common.Node;
22-
import org.apache.kafka.common.TopicPartition;
2322
import org.apache.kafka.common.errors.ApiException;
2423
import org.apache.kafka.common.message.DeleteShareGroupOffsetsRequestData;
2524
import org.apache.kafka.common.protocol.Errors;
@@ -38,24 +37,23 @@
3837
import java.util.List;
3938
import java.util.Map;
4039
import java.util.Set;
41-
import java.util.stream.Collectors;
4240

4341
/**
4442
* This class is the handler for {@link KafkaAdminClient#deleteShareGroupOffsets(String, Set, DeleteShareGroupOffsetsOptions)} call
4543
*/
46-
public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<TopicPartition, ApiException>> {
44+
public class DeleteShareGroupOffsetsHandler extends AdminApiHandler.Batched<CoordinatorKey, Map<String, ApiException>> {
4745

4846
private final CoordinatorKey groupId;
4947

5048
private final Logger log;
5149

52-
private final Set<TopicPartition> partitions;
50+
private final Set<String> topics;
5351

5452
private final CoordinatorStrategy lookupStrategy;
5553

56-
public DeleteShareGroupOffsetsHandler(String groupId, Set<TopicPartition> partitions, LogContext logContext) {
54+
public DeleteShareGroupOffsetsHandler(String groupId, Set<String> topics, LogContext logContext) {
5755
this.groupId = CoordinatorKey.byGroupId(groupId);
58-
this.partitions = partitions;
56+
this.topics = topics;
5957
this.log = logContext.logger(DeleteShareGroupOffsetsHandler.class);
6058
this.lookupStrategy = new CoordinatorStrategy(FindCoordinatorRequest.CoordinatorType.GROUP, logContext);
6159
}
@@ -70,7 +68,7 @@ public AdminApiLookupStrategy<CoordinatorKey> lookupStrategy() {
7068
return lookupStrategy;
7169
}
7270

73-
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<TopicPartition, ApiException>> newFuture(String groupId) {
71+
public static AdminApiFuture.SimpleAdminApiFuture<CoordinatorKey, Map<String, ApiException>> newFuture(String groupId) {
7472
return AdminApiFuture.forKeys(Collections.singleton(CoordinatorKey.byGroupId(groupId)));
7573
}
7674

@@ -85,26 +83,22 @@ private void validateKeys(Set<CoordinatorKey> groupIds) {
8583
DeleteShareGroupOffsetsRequest.Builder buildBatchedRequest(int brokerId, Set<CoordinatorKey> groupIds) {
8684
validateKeys(groupIds);
8785

88-
final List<DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic> topics =
86+
final List<DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic> requestTopics =
8987
new ArrayList<>();
90-
partitions.stream().collect(Collectors.groupingBy(TopicPartition::topic)).forEach((topic, topicPartitions) -> topics.add(
88+
topics.forEach(topic -> requestTopics.add(
9189
new DeleteShareGroupOffsetsRequestData.DeleteShareGroupOffsetsRequestTopic()
9290
.setTopicName(topic)
93-
.setPartitions(topicPartitions.stream()
94-
.map(TopicPartition::partition)
95-
.collect(Collectors.toList())
96-
)
9791
));
9892

9993
return new DeleteShareGroupOffsetsRequest.Builder(
10094
new DeleteShareGroupOffsetsRequestData()
10195
.setGroupId(groupId.idValue)
102-
.setTopics(topics)
96+
.setTopics(requestTopics)
10397
);
10498
}
10599

106100
@Override
107-
public ApiResult<CoordinatorKey, Map<TopicPartition, ApiException>> handleResponse(
101+
public ApiResult<CoordinatorKey, Map<String, ApiException>> handleResponse(
108102
Node coordinator,
109103
Set<CoordinatorKey> groupIds,
110104
AbstractResponse abstractResponse
@@ -123,23 +117,21 @@ public ApiResult<CoordinatorKey, Map<TopicPartition, ApiException>> handleRespon
123117

124118
return new ApiResult<>(Collections.emptyMap(), groupsFailed, new ArrayList<>(groupsToUnmap));
125119
} else {
126-
final Map<TopicPartition, ApiException> partitionResults = new HashMap<>();
127-
response.data().responses().forEach(topic ->
128-
topic.partitions().forEach(partition -> {
129-
if (partition.errorCode() != Errors.NONE.code()) {
130-
final Errors partitionError = Errors.forCode(partition.errorCode());
131-
final String partitionErrorMessage = partition.errorMessage();
132-
log.debug("DeleteShareGroupOffsets request for group id {}, topic {} and partition {} failed and returned error {}." + partitionErrorMessage,
133-
groupId.idValue, topic.topicName(), partition.partitionIndex(), partitionError);
134-
}
135-
partitionResults.put(
136-
new TopicPartition(topic.topicName(), partition.partitionIndex()),
137-
Errors.forCode(partition.errorCode()).exception(partition.errorMessage())
138-
);
139-
})
140-
);
141-
142-
return ApiResult.completed(groupId, partitionResults);
120+
final Map<String, ApiException> topicResults = new HashMap<>();
121+
response.data().responses().forEach(topic -> {
122+
if (topic.errorCode() != Errors.NONE.code()) {
123+
final Errors topicError = Errors.forCode(topic.errorCode());
124+
final String topicErrorMessage = topic.errorMessage();
125+
log.debug("DeleteShareGroupOffsets request for group id {} and topic {} failed and returned error {}." + topicErrorMessage,
126+
groupId.idValue, topic.topicName(), topicError);
127+
}
128+
topicResults.put(
129+
topic.topicName(),
130+
Errors.forCode(topic.errorCode()).exception(topic.errorMessage())
131+
);
132+
});
133+
134+
return ApiResult.completed(groupId, topicResults);
143135
}
144136
}
145137

clients/src/main/java/org/apache/kafka/common/requests/DeleteShareGroupOffsetsResponse.java

+1-3
Original file line numberDiff line numberDiff line change
@@ -43,9 +43,7 @@ public Map<Errors, Integer> errorCounts() {
4343
Map<Errors, Integer> counts = new EnumMap<>(Errors.class);
4444
updateErrorCounts(counts, Errors.forCode(data.errorCode()));
4545
data.responses().forEach(
46-
topicResult -> topicResult.partitions().forEach(
47-
partitionResult -> updateErrorCounts(counts, Errors.forCode(partitionResult.errorCode()))
48-
)
46+
topicResult -> updateErrorCounts(counts, Errors.forCode(topicResult.errorCode()))
4947
);
5048
return counts;
5149
}

clients/src/main/resources/common/message/DeleteShareGroupOffsetsRequest.json

+1-3
Original file line numberDiff line numberDiff line change
@@ -26,9 +26,7 @@
2626
{ "name": "Topics", "type": "[]DeleteShareGroupOffsetsRequestTopic", "versions": "0+",
2727
"about": "The topics to delete offsets for.", "fields": [
2828
{ "name": "TopicName", "type": "string", "versions": "0+", "entityType": "topicName",
29-
"about": "The topic name." },
30-
{ "name": "Partitions", "type": "[]int32", "versions": "0+",
31-
"about": "The partitions." }
29+
"about": "The topic name." }
3230
]}
3331
]
3432
}

clients/src/main/resources/common/message/DeleteShareGroupOffsetsResponse.json

+5-8
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
// - KAFKA_STORAGE_ERROR (version 0+)
3131
// - INVALID_REQUEST (version 0+)
3232
// - UNKNOWN_SERVER_ERROR (version 0+)
33+
// - UNKNOWN_TOPIC_OR_PARTITION (version 0+)
3334
"fields": [
3435
{ "name": "ThrottleTimeMs", "type": "int32", "versions": "0+",
3536
"about": "The duration in milliseconds for which the request was throttled due to a quota violation, or zero if the request did not violate any quota." },
@@ -43,14 +44,10 @@
4344
"about": "The topic name." },
4445
{ "name": "TopicId", "type": "uuid", "versions": "0+",
4546
"about": "The unique topic ID." },
46-
{ "name": "Partitions", "type": "[]DeleteShareGroupOffsetsResponsePartition", "versions": "0+", "fields": [
47-
{ "name": "PartitionIndex", "type": "int32", "versions": "0+",
48-
"about": "The partition index." },
49-
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
50-
"about": "The partition-level error code, or 0 if there was no error." },
51-
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
52-
"about": "The partition-level error message, or null if there was no error." }
53-
]}
47+
{ "name": "ErrorCode", "type": "int16", "versions": "0+",
48+
"about": "The topic-level error code, or 0 if there was no error." },
49+
{ "name": "ErrorMessage", "type": "string", "versions": "0+", "nullableVersions": "0+", "default": "null",
50+
"about": "The topic-level error message, or null if there was no error." }
5451
]}
5552
]
5653
}

0 commit comments

Comments
 (0)