Skip to content

Commit 92fce97

Browse files
committed
Fix: make KafkaConsumer.commitSync non-blocking
Motivation: Currently our invocation to `rd_kafka_commit` inside of `KafkaCosumer.commitSync` is blocking a cooperative thread. This PR aims to make `KafkaCosumer.commitSync` non-blocking by using the callback-based commit API. Modifications: * move `commitSync` logic to `KafkaClient` * replace the blocking invocation to [rd_kafka_commit](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#ab96539928328f14c3c9177ea0c896c87) with a callback-based invocation to [rd_kafka_commit_queue](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#af76a6a73baa9c2621536e3f6882a3c1a) which is then wrapped inside a `withAsyncThrowingContinuation` statement
1 parent 77d0b0e commit 92fce97

File tree

3 files changed

+83
-35
lines changed

3 files changed

+83
-35
lines changed

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 79 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -126,6 +126,85 @@ final class KafkaClient {
126126
}
127127
}
128128

129+
/// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`.
130+
/// This is specifically used to pass a Swift closure as a commit callback for the ``KafkaConsumer``.
131+
final class CapturedCommitCallback {
132+
typealias Closure = (Result<Void, KafkaError>) -> Void
133+
let closure: Closure
134+
135+
init(_ closure: @escaping Closure) {
136+
self.closure = closure
137+
}
138+
}
139+
140+
/// Non-blocking commit of a the `message`'s offset to Kafka.
141+
///
142+
/// - Parameter message: Last received message that shall be marked as read.
143+
func commitSync(_ message: KafkaConsumerMessage) async throws {
144+
// Declare captured closure outside of withCheckedContinuation.
145+
// We do that because do an unretained pass of the captured closure to
146+
// librdkafka which means we have to keep a reference to the closure
147+
// ourselves to make sure it does not get deallocated before
148+
// commitSync returns.
149+
var capturedClosure: CapturedCommitCallback!
150+
try await withCheckedThrowingContinuation { continuation in
151+
capturedClosure = CapturedCommitCallback { result in
152+
continuation.resume(with: result)
153+
}
154+
155+
// The offset committed is always the offset of the next requested message.
156+
// Thus, we increase the offset of the current message by one before committing it.
157+
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
158+
let changesList = RDKafkaTopicPartitionList()
159+
changesList.setOffset(
160+
topic: message.topic,
161+
partition: message.partition,
162+
offset: Int64(message.offset + 1)
163+
)
164+
165+
// Unretained pass because the reference that librdkafka holds to capturedClosure
166+
// should not be counted in ARC as this can lead to memory leaks.
167+
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(capturedClosure).toOpaque()
168+
169+
let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)
170+
171+
// Create a C closure that calls the captured closure
172+
let callbackWrapper: (
173+
@convention(c) (
174+
OpaquePointer?,
175+
rd_kafka_resp_err_t,
176+
UnsafeMutablePointer<rd_kafka_topic_partition_list_t>?,
177+
UnsafeMutableRawPointer?
178+
) -> Void
179+
) = { _, error, _, opaquePointer in
180+
181+
guard let opaquePointer = opaquePointer else {
182+
fatalError("Could not resolve reference to catpured Swift callback instance")
183+
}
184+
let opaque = Unmanaged<CapturedCommitCallback>.fromOpaque(opaquePointer).takeUnretainedValue()
185+
186+
let actualCallback = opaque.closure
187+
188+
if error == RD_KAFKA_RESP_ERR_NO_ERROR {
189+
actualCallback(.success(()))
190+
} else {
191+
let kafkaError = KafkaError.rdKafkaError(wrapping: error)
192+
actualCallback(.failure(kafkaError))
193+
}
194+
}
195+
196+
changesList.withListPointer { listPointer in
197+
rd_kafka_commit_queue(
198+
self.kafkaHandle,
199+
listPointer,
200+
consumerQueue,
201+
callbackWrapper,
202+
opaquePointer
203+
)
204+
}
205+
}
206+
}
207+
129208
/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
130209
/// - Warning: Do not escape the pointer from the closure for later use.
131210
/// - Parameter body: The closure will use the Kafka handle pointer.

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 1 addition & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -223,17 +223,6 @@ public final class KafkaConsumer {
223223
/// - Throws: A ``KafkaError`` if committing failed.
224224
/// - Warning: This method fails if the `enable.auto.commit` configuration property is set to `true`.
225225
public func commitSync(_ message: KafkaConsumerMessage) async throws {
226-
try await withCheckedThrowingContinuation { continuation in
227-
do {
228-
try self._commitSync(message) // Blocks until commiting the offset is done
229-
continuation.resume()
230-
} catch {
231-
continuation.resume(throwing: error)
232-
}
233-
}
234-
}
235-
236-
private func _commitSync(_ message: KafkaConsumerMessage) throws {
237226
let action = self.stateMachine.withLockedValue { $0.commitSync() }
238227
switch action {
239228
case .throwClosedError:
@@ -243,29 +232,7 @@ public final class KafkaConsumer {
243232
throw KafkaError.config(reason: "Committing manually only works if enable.auto.commit is set to false")
244233
}
245234

246-
// The offset committed is always the offset of the next requested message.
247-
// Thus, we increase the offset of the current message by one before committing it.
248-
// See: https://github.com/edenhill/librdkafka/issues/2745#issuecomment-598067945
249-
let changesList = RDKafkaTopicPartitionList()
250-
changesList.setOffset(
251-
topic: message.topic,
252-
partition: message.partition,
253-
offset: Int64(message.offset + 1)
254-
)
255-
256-
let result = client.withKafkaHandlePointer { handle in
257-
changesList.withListPointer { listPointer in
258-
rd_kafka_commit(
259-
handle,
260-
listPointer,
261-
0
262-
) // Blocks until commiting the offset is done
263-
// -> Will be resolved by: https://github.com/swift-server/swift-kafka-gsoc/pull/68
264-
}
265-
}
266-
guard result == RD_KAFKA_RESP_ERR_NO_ERROR else {
267-
throw KafkaError.rdKafkaError(wrapping: result)
268-
}
235+
try await client.commitSync(message)
269236
}
270237
}
271238

Sources/SwiftKafka/RDKafka/RDKafkaConfig.swift

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,9 @@ struct RDKafkaConfig {
7878
) -> CapturedClosures {
7979
let closures = CapturedClosures()
8080

81-
// Pass the the reference to Opaque as an opaque object
81+
// Pass the captured closure to the C closure as an opaque object.
82+
// Unretained pass because the reference that librdkafka holds to the captured closures
83+
// should not be counted in ARC as this can lead to memory leaks.
8284
let opaquePointer: UnsafeMutableRawPointer? = Unmanaged.passUnretained(closures).toOpaque()
8385
rd_kafka_conf_set_opaque(
8486
configPointer,

0 commit comments

Comments
 (0)