Skip to content

Allow groupID to be specified when assigning partition #161

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 15 additions & 8 deletions Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public struct KafkaConsumerConfiguration {
/// A struct representing the different Kafka message consumption strategies.
public struct ConsumptionStrategy: Sendable, Hashable {
enum _ConsumptionStrategy: Sendable, Hashable {
case partition(topic: String, partition: KafkaPartition, offset: KafkaOffset)
case partition(groupID: String?, topic: String, partition: KafkaPartition, offset: KafkaOffset)
case group(groupID: String, topics: [String])
}

Expand All @@ -72,14 +72,16 @@ public struct KafkaConsumerConfiguration {
///
/// - Parameters:
/// - partition: The partition of the topic to consume from.
/// - groupID: The ID of the consumer group to commit to. Defaults to no group ID. Specifying a group ID is useful if partitions assignment is manually managed but committed offsets should still be tracked in a consumer group.
/// - topic: The name of the Kafka topic.
/// - offset: The offset to start consuming from. Defaults to the end of the Kafka partition queue (meaning wait for the next produced message).
public static func partition(
_ partition: KafkaPartition,
groupID: String? = nil,
topic: String,
offset: KafkaOffset = .end
) -> ConsumptionStrategy {
return .init(consumptionStrategy: .partition(topic: topic, partition: partition, offset: offset))
return .init(consumptionStrategy: .partition(groupID: groupID, topic: topic, partition: partition, offset: offset))
}

/// A consumption strategy based on consumer group membership.
Expand Down Expand Up @@ -261,12 +263,17 @@ extension KafkaConsumerConfiguration {
var resultDict: [String: String] = [:]

switch self.consumptionStrategy._internal {
case .partition:
// Although an assignment is not related to a consumer group,
// librdkafka requires us to set a `group.id`.
// This is a known issue:
// https://github.com/edenhill/librdkafka/issues/3261
resultDict["group.id"] = UUID().uuidString
case .partition(groupID: let groupID, topic: _, partition: _, offset: _):
if let groupID = groupID {
resultDict["group.id"] = groupID
} else {
// Although an assignment is not related to a consumer group,
// librdkafka requires us to set a `group.id`.
// This is a known issue:
// https://github.com/edenhill/librdkafka/issues/3261
resultDict["group.id"] = UUID().uuidString
}

case .group(groupID: let groupID, topics: _):
resultDict["group.id"] = groupID
}
Expand Down
2 changes: 1 addition & 1 deletion Sources/Kafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ public final class KafkaConsumer: Sendable, Service {

private func _run() async throws {
switch self.configuration.consumptionStrategy._internal {
case .partition(topic: let topic, partition: let partition, offset: let offset):
case .partition(groupID: _, topic: let topic, partition: let partition, offset: let offset):
try self.assign(topic: topic, partition: partition, offset: offset)
case .group(groupID: _, topics: let topics):
try self.subscribe(topics: topics)
Expand Down