Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 92 additions & 4 deletions Sources/SwiftKafka/KafkaClient.swift
Original file line number Diff line number Diff line change
Expand Up @@ -118,14 +118,102 @@ final class KafkaClient {
}
}

/// Close the consumer.
/// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`.
/// This is specifically used to pass a Swift closure as a commit callback for the ``KafkaConsumer``.
final class CapturedCommitCallback {
typealias Closure = (Result<Void, KafkaError>) -> Void
let closure: Closure

init(_ closure: @escaping Closure) {
self.closure = closure
}
}

/// Non-blocking commit of a the `message`'s offset to Kafka.
///
/// - Parameter message: Last received message that shall be marked as read.
func commitSync(_ message: KafkaConsumerMessage) async throws {
// Declare captured closure outside of withCheckedContinuation.
// We do that because do an unretained pass of the captured closure to
// librdkafka which means we have to keep a reference to the closure
// ourselves to make sure it does not get deallocated before
// commitSync returns.
var capturedClosure: CapturedCommitCallback!
try await withCheckedThrowingContinuation { continuation in
capturedClosure = CapturedCommitCallback { result in
continuation.resume(with: result)
}

// The offset committed is always the offset of the next requested message.
// Thus, we increase the offset of the current message by one before committing it.
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
let changesList = RDKafkaTopicPartitionList()
changesList.setOffset(
topic: message.topic,
partition: message.partition,
offset: Int64(message.offset + 1)
)

// Unretained pass because the reference that librdkafka holds to capturedClosure
// should not be counted in ARC as this can lead to memory leaks.
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque()

let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)

// Create a C closure that calls the captured closure
let callbackWrapper: (
@convention(c) (
OpaquePointer?,
rd_kafka_resp_err_t,
UnsafeMutablePointer<rd_kafka_topic_partition_list_t>?,
UnsafeMutableRawPointer?
) -> Void
) = { _, error, _, opaquePointer in

guard let opaquePointer = opaquePointer else {
fatalError("Could not resolve reference to catpured Swift callback instance")
}
let opaque = Unmanaged<CapturedCommitCallback>.fromOpaque(opaquePointer).takeUnretainedValue()

let actualCallback = opaque.closure

if error == RD_KAFKA_RESP_ERR_NO_ERROR {
actualCallback(.success(()))
} else {
let kafkaError = KafkaError.rdKafkaError(wrapping: error)
actualCallback(.failure(kafkaError))
}
}

changesList.withListPointer { listPointer in
rd_kafka_commit_queue(
self.kafkaHandle,
listPointer,
consumerQueue,
callbackWrapper,
opaquePointer
)
}
}
}

/// Close the consumer asynchronously. This means revoking its assignemnt, committing offsets to broker and
/// leaving the consumer group (if applicable).
///
/// Make sure to run poll loop until ``KafkaClient/consumerIsClosed`` returns `true`.
func consumerClose() throws {
let result = rd_kafka_consumer_close(self.kafkaHandle)
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: result)
let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)
let result = rd_kafka_consumer_close_queue(self.kafkaHandle, consumerQueue)
let kafkaError = rd_kafka_error_code(result)
if kafkaError != RD_KAFKA_RESP_ERR_NO_ERROR {
throw KafkaError.rdKafkaError(wrapping: kafkaError)
}
}

var isConsumerClosed: Bool {
rd_kafka_consumer_closed(self.kafkaHandle) == 1
}

/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
/// - Warning: Do not escape the pointer from the closure for later use.
/// - Parameter body: The closure will use the Kafka handle pointer.
Expand Down
95 changes: 42 additions & 53 deletions Sources/SwiftKafka/KafkaConsumer.swift
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,10 @@ extension ShutdownOnTerminate: NIOAsyncSequenceProducerDelegate {
}

func didTerminate() {
// Duplicate of _shutdownGracefully
// Duplicate of _triggerGracefulShutdown
let action = self.stateMachine.withLockedValue { $0.finish() }
switch action {
case .shutdownGracefullyAndFinishSource(let client, let source):
case .triggerGracefulShutdownAndFinishSource(let client, let source):
source.finish()

do {
Expand Down Expand Up @@ -151,7 +151,7 @@ public final class KafkaConsumer {
}

deinit {
self.shutdownGracefully()
self.triggerGracefulShutdown()
}

/// Subscribe to the given list of `topics`.
Expand Down Expand Up @@ -201,16 +201,20 @@ public final class KafkaConsumer {
switch nextAction {
case .pollForAndYieldMessage(let client, let source):
do {
guard let message = try client.consumerPoll() else {
break
if let message = try client.consumerPoll() {
// We do not support back pressure, we can ignore the yield result
_ = source.yield(message)
}
// We do not support back pressure, we can ignore the yield result
_ = source.yield(message)
} catch {
source.finish()
throw error
}
try await Task.sleep(for: self.config.pollInterval)
case .pollUntilClosed(let client):
// Ignore poll result, we are closing down and just polling to commit
// outstanding consumer state
_ = try client.consumerPoll()
try await Task.sleep(for: self.config.pollInterval)
case .terminatePollLoop:
return
}
Expand All @@ -222,18 +226,8 @@ public final class KafkaConsumer {
/// - Parameter message: Last received message that shall be marked as read.
/// - Throws: A ``KafkaError`` if committing failed.
/// - Warning: This method fails if the `enable.auto.commit` configuration property is set to `true`.
/// - Important: This method does not support `Task` cancellation.
public func commitSync(_ message: KafkaConsumerMessage) async throws {
try await withCheckedThrowingContinuation { continuation in
do {
try self._commitSync(message) // Blocks until commiting the offset is done
continuation.resume()
} catch {
continuation.resume(throwing: error)
}
}
}

private func _commitSync(_ message: KafkaConsumerMessage) throws {
let action = self.stateMachine.withLockedValue { $0.commitSync() }
switch action {
case .throwClosedError:
Expand All @@ -243,41 +237,19 @@ public final class KafkaConsumer {
throw KafkaError.config(reason: "Committing manually only works if enable.auto.commit is set to false")
}

// The offset committed is always the offset of the next requested message.
// Thus, we increase the offset of the current message by one before committing it.
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
let changesList = RDKafkaTopicPartitionList()
changesList.setOffset(
topic: message.topic,
partition: message.partition,
offset: Int64(message.offset + 1)
)

let result = client.withKafkaHandlePointer { handle in
changesList.withListPointer { listPointer in
rd_kafka_commit(
handle,
listPointer,
0
) // Blocks until commiting the offset is done
// -> Will be resolved by: https://github.com/swift-server/swift-kafka-gsoc/pull/68
}
}
guard result == RD_KAFKA_RESP_ERR_NO_ERROR else {
throw KafkaError.rdKafkaError(wrapping: result)
}
try await client.commitSync(message)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The underlying APIs don't support cancellation. Let's call this out in the docs of this method.

}
}

/// This function is used to gracefully shut down a Kafka consumer client.
///
/// - Note: Invoking this function is not always needed as the ``KafkaConsumer``
/// will already shut down when consumption of the ``KafkaConsumerMessages`` has ended.
private func shutdownGracefully() {
private func triggerGracefulShutdown() {
let action = self.stateMachine.withLockedValue { $0.finish() }
switch action {
case .shutdownGracefullyAndFinishSource(let client, let source):
self._shutdownGracefullyAndFinishSource(
case .triggerGracefulShutdownAndFinishSource(let client, let source):
self._triggerGracefulShutdownAndFinishSource(
client: client,
source: source,
logger: self.logger
Expand All @@ -287,7 +259,7 @@ public final class KafkaConsumer {
}
}

private func _shutdownGracefullyAndFinishSource(
private func _triggerGracefulShutdownAndFinishSource(
client: KafkaClient,
source: Producer.Source,
logger: Logger
Expand Down Expand Up @@ -336,7 +308,12 @@ extension KafkaConsumer {
client: KafkaClient,
source: Producer.Source
)
/// The ``KafkaConsumer`` has been closed.
/// The ``KafkaConsumer/triggerGracefulShutdown()`` has been invoked.
/// We are now in the process of commiting our last state to the broker.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case finishing(client: KafkaClient)
/// The ``KafkaConsumer`` is closed.
case finished
}

Expand Down Expand Up @@ -368,6 +345,11 @@ extension KafkaConsumer {
client: KafkaClient,
source: Producer.Source
)
/// The ``KafkaConsumer`` is in the process of closing down, but still needs to poll
/// to commit its state to the broker.
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
case pollUntilClosed(client: KafkaClient)
/// Terminate the poll loop.
case terminatePollLoop
}
Expand All @@ -376,14 +358,21 @@ extension KafkaConsumer {
/// - Returns: The next action to be taken when wanting to poll, or `nil` if there is no action to be taken.
///
/// - Important: This function throws a `fatalError` if called while in the `.initializing` state.
func nextPollLoopAction() -> PollLoopAction {
mutating func nextPollLoopAction() -> PollLoopAction {
switch self.state {
case .uninitialized:
fatalError("\(#function) invoked while still in state \(self.state)")
case .initializing:
fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages")
case .consuming(let client, let source):
return .pollForAndYieldMessage(client: client, source: source)
case .finishing(let client):
if client.isConsumerClosed {
self.state = .finished
return .terminatePollLoop
} else {
return .pollUntilClosed(client: client)
}
case .finished:
return .terminatePollLoop
}
Expand All @@ -409,7 +398,7 @@ extension KafkaConsumer {
source: source
)
return .setUpConnection(client: client)
case .consuming, .finished:
case .consuming, .finishing, .finished:
fatalError("\(#function) should only be invoked upon initialization of KafkaConsumer")
}
}
Expand Down Expand Up @@ -438,7 +427,7 @@ extension KafkaConsumer {
fatalError("Subscribe to consumer group / assign to topic partition pair before committing offsets")
case .consuming(let client, _):
return .commitSync(client: client)
case .finished:
case .finishing, .finished:
return .throwClosedError
}
}
Expand All @@ -449,7 +438,7 @@ extension KafkaConsumer {
///
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
/// - Parameter source: ``NIOAsyncSequenceProducer/Source`` used for yielding new elements.
case shutdownGracefullyAndFinishSource(
case triggerGracefulShutdownAndFinishSource(
client: KafkaClient,
source: Producer.Source
)
Expand All @@ -466,12 +455,12 @@ extension KafkaConsumer {
case .initializing:
fatalError("subscribe() / assign() should have been invoked before \(#function)")
case .consuming(let client, let source):
self.state = .finished
return .shutdownGracefullyAndFinishSource(
self.state = .finishing(client: client)
return .triggerGracefulShutdownAndFinishSource(
client: client,
source: source
)
case .finished:
case .finishing, .finished:
return nil
}
}
Expand Down
4 changes: 3 additions & 1 deletion Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,9 @@ struct RDKafkaConfig {
) -> CapturedClosures {
let closures = CapturedClosures()

// Pass the the reference to Opaque as an opaque object
// Pass the captured closure to the C closure as an opaque object.
// Unretained pass because the reference that librdkafka holds to the captured closures
// should not be counted in ARC as this can lead to memory leaks.
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(closures).toOpaque()
rd_kafka_conf_set_opaque(
configPointer,
Expand Down