Skip to content

Commit 912a41e

Browse files
Ómar Kjartan YasinÓmar K. Yasin
authored andcommitted
Allow groupID to be specified when assigning partition
Motivation: A Consumer Group can provide a lot of benefits even if the dynamic loadbalancing features are not used. Modifications: Allow for an optional GroupID when creating a partition consumer. Result: Consumer Groups can now be used when manual assignment is used.
1 parent 51c5f72 commit 912a41e

File tree

2 files changed

+30
-8
lines changed

2 files changed

+30
-8
lines changed

Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public struct KafkaConsumerConfiguration {
5757
/// A struct representing the different Kafka message consumption strategies.
5858
public struct ConsumptionStrategy: Sendable, Hashable {
5959
enum _ConsumptionStrategy: Sendable, Hashable {
60-
case partition(topic: String, partition: KafkaPartition, offset: KafkaOffset)
60+
case partition(groupID: String? = nil, topic: String, partition: KafkaPartition, offset: KafkaOffset)
6161
case group(groupID: String, topics: [String])
6262
}
6363

@@ -82,6 +82,23 @@ public struct KafkaConsumerConfiguration {
8282
return .init(consumptionStrategy: .partition(topic: topic, partition: partition, offset: offset))
8383
}
8484

85+
/// A consumption strategy based on partition assignment.
86+
/// The consumer reads from a specific partition of a topic at a given offset.
87+
///
88+
/// - Parameters:
89+
/// - partition: The partition of the topic to consume from.
90+
/// - group: The ID of the consumer group to commit to.
91+
/// - topic: The name of the Kafka topic.
92+
/// - offset: The offset to start consuming from. Defaults to the end of the Kafka partition queue (meaning wait for the next produced message).
93+
public static func partition(
94+
_ partition: KafkaPartition,
95+
groupID: String,
96+
topic: String,
97+
offset: KafkaOffset = .end
98+
) -> ConsumptionStrategy {
99+
return .init(consumptionStrategy: .partition(groupID: groupID, topic: topic, partition: partition, offset: offset))
100+
}
101+
85102
/// A consumption strategy based on consumer group membership.
86103
/// The consumer joins a consumer group identified by a group ID and consumes from multiple topics.
87104
///
@@ -261,12 +278,17 @@ extension KafkaConsumerConfiguration {
261278
var resultDict: [String: String] = [:]
262279

263280
switch self.consumptionStrategy._internal {
264-
case .partition:
265-
// Although an assignment is not related to a consumer group,
266-
// librdkafka requires us to set a `group.id`.
267-
// This is a known issue:
268-
// https://github.com/edenhill/librdkafka/issues/3261
269-
resultDict["group.id"] = UUID().uuidString
281+
case .partition(groupID: let groupID, topic: _, partition: _, offset: _):
282+
if let groupID = groupID {
283+
resultDict["group.id"] = groupID
284+
} else {
285+
// Although an assignment is not related to a consumer group,
286+
// librdkafka requires us to set a `group.id`.
287+
// This is a known issue:
288+
// https://github.com/edenhill/librdkafka/issues/3261
289+
resultDict["group.id"] = UUID().uuidString
290+
}
291+
270292
case .group(groupID: let groupID, topics: _):
271293
resultDict["group.id"] = groupID
272294
}

Sources/Kafka/KafkaConsumer.swift

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -359,7 +359,7 @@ public final class KafkaConsumer: Sendable, Service {
359359

360360
private func _run() async throws {
361361
switch self.configuration.consumptionStrategy._internal {
362-
case .partition(topic: let topic, partition: let partition, offset: let offset):
362+
case .partition(groupID: _, topic: let topic, partition: let partition, offset: let offset):
363363
try self.assign(topic: topic, partition: partition, offset: offset)
364364
case .group(groupID: _, topics: let topics):
365365
try self.subscribe(topics: topics)

0 commit comments

Comments
 (0)