Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 22 additions & 22 deletions Sources/Kafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
public typealias Element = KafkaConsumerMessage
typealias BackPressureStrategy = NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure
typealias WrappedSequence = NIOThrowingAsyncSequenceProducer<
Element,
Result<KafkaConsumerMessage, Error>,
Error,
BackPressureStrategy,
KafkaConsumerCloseOnTerminate
Expand All @@ -80,24 +80,30 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
var wrappedIterator: WrappedSequence.AsyncIterator?

public mutating func next() async throws -> Element? {
guard let element = try await self.wrappedIterator?.next() else {
guard let result = try await self.wrappedIterator?.next() else {
self.deallocateIterator()
return nil
}

let action = self.stateMachine.withLockedValue { $0.storeOffset() }
switch action {
case .storeOffset(let client):
do {
try client.storeMessageOffset(element)
} catch {
switch result {
case .success(let message):
let action = self.stateMachine.withLockedValue { $0.storeOffset() }
switch action {
case .storeOffset(let client):
do {
try client.storeMessageOffset(message)
} catch {
self.deallocateIterator()
throw error
}
return message
case .terminateConsumerSequence:
self.deallocateIterator()
throw error
return nil
}
return element
case .terminateConsumerSequence:
case .failure(let error):
self.deallocateIterator()
return nil
throw error
}
}

Expand All @@ -119,7 +125,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence {
/// A ``KafkaConsumer `` can be used to consume messages from a Kafka cluster.
public final class KafkaConsumer: Sendable, Service {
typealias Producer = NIOThrowingAsyncSequenceProducer<
KafkaConsumerMessage,
Result<KafkaConsumerMessage, Error>,
Error,
NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure,
KafkaConsumerCloseOnTerminate
Expand Down Expand Up @@ -156,7 +162,7 @@ public final class KafkaConsumer: Sendable, Service {
self.logger = logger

let sourceAndSequence = NIOThrowingAsyncSequenceProducer.makeSequence(
elementType: KafkaConsumerMessage.self,
elementType: Result<KafkaConsumerMessage, Error>.self,
backPressureStrategy: NIOAsyncSequenceProducerBackPressureStrategies.NoBackPressure(),
delegate: KafkaConsumerCloseOnTerminate(stateMachine: self.stateMachine)
)
Expand Down Expand Up @@ -333,14 +339,8 @@ public final class KafkaConsumer: Sendable, Service {
for event in events {
switch event {
case .consumerMessages(let result):
switch result {
case .success(let message):
// We do not support back pressure, we can ignore the yield result
_ = source.yield(message)
case .failure(let error):
source.finish()
throw error
}
// We do not support back pressure, we can ignore the yield result
_ = source.yield(result)
default:
break // Ignore
}
Expand Down