diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 12fe0dca..8e4529ce 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -181,13 +181,6 @@ public final class KafkaConsumer: Sendable, Service { // Forward main queue events to the consumer queue. try client.pollSetConsumer() - - switch configuration.consumptionStrategy._internal { - case .partition(topic: let topic, partition: let partition, offset: let offset): - try self.assign(topic: topic, partition: partition, offset: offset) - case .group(groupID: _, topics: let topics): - try self.subscribe(topics: topics) - } } /// Initialize a new ``KafkaConsumer``. @@ -331,6 +324,13 @@ public final class KafkaConsumer: Sendable, Service { } private func _run() async throws { + switch self.configuration.consumptionStrategy._internal { + case .partition(topic: let topic, partition: let partition, offset: let offset): + try self.assign(topic: topic, partition: partition, offset: offset) + case .group(groupID: _, topics: let topics): + try self.subscribe(topics: topics) + } + while !Task.isCancelled { let nextAction = self.stateMachine.withLockedValue { $0.nextPollLoopAction() } switch nextAction {