diff --git a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift index dae5691f..2b3b9bfc 100644 --- a/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift +++ b/Sources/Kafka/Configuration/KafkaConsumerConfiguration.swift @@ -57,7 +57,7 @@ public struct KafkaConsumerConfiguration { /// A struct representing the different Kafka message consumption strategies. public struct ConsumptionStrategy: Sendable, Hashable { enum _ConsumptionStrategy: Sendable, Hashable { - case partition(topic: String, partition: KafkaPartition, offset: KafkaOffset) + case partition(groupID: String?, topic: String, partition: KafkaPartition, offset: KafkaOffset) case group(groupID: String, topics: [String]) } @@ -72,14 +72,16 @@ public struct KafkaConsumerConfiguration { /// /// - Parameters: /// - partition: The partition of the topic to consume from. + /// - groupID: The ID of the consumer group to commit to. Defaults to no group ID. Specifying a group ID is useful if partitions assignment is manually managed but committed offsets should still be tracked in a consumer group. /// - topic: The name of the Kafka topic. /// - offset: The offset to start consuming from. Defaults to the end of the Kafka partition queue (meaning wait for the next produced message). public static func partition( _ partition: KafkaPartition, + groupID: String? = nil, topic: String, offset: KafkaOffset = .end ) -> ConsumptionStrategy { - return .init(consumptionStrategy: .partition(topic: topic, partition: partition, offset: offset)) + return .init(consumptionStrategy: .partition(groupID: groupID, topic: topic, partition: partition, offset: offset)) } /// A consumption strategy based on consumer group membership. @@ -261,12 +263,17 @@ extension KafkaConsumerConfiguration { var resultDict: [String: String] = [:] switch self.consumptionStrategy._internal { - case .partition: - // Although an assignment is not related to a consumer group, - // librdkafka requires us to set a `group.id`. - // This is a known issue: - // https://github.com/edenhill/librdkafka/issues/3261 - resultDict["group.id"] = UUID().uuidString + case .partition(groupID: let groupID, topic: _, partition: _, offset: _): + if let groupID = groupID { + resultDict["group.id"] = groupID + } else { + // Although an assignment is not related to a consumer group, + // librdkafka requires us to set a `group.id`. + // This is a known issue: + // https://github.com/edenhill/librdkafka/issues/3261 + resultDict["group.id"] = UUID().uuidString + } + case .group(groupID: let groupID, topics: _): resultDict["group.id"] = groupID } diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 87384220..d7666650 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -359,7 +359,7 @@ public final class KafkaConsumer: Sendable, Service { private func _run() async throws { switch self.configuration.consumptionStrategy._internal { - case .partition(topic: let topic, partition: let partition, offset: let offset): + case .partition(groupID: _, topic: let topic, partition: let partition, offset: let offset): try self.assign(topic: topic, partition: partition, offset: offset) case .group(groupID: _, topics: let topics): try self.subscribe(topics: topics)