Skip to content

Commit 9e0f40d

Browse files
committed
KafkaClient.consumerClose: make non-blocking
Motivation: [rd_kakfa_consumer_close](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a37b54d329e12d745889defe96e7d043d) was blocking. This PR proposes using the [rd_kakfa_consumer_close_queue](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a9dd5c18bdfed81c8847b259f0a8d498d) API which is non-blocking and served through the normal poll loop. We now Modifications: * `KafkaClient.consumerClose`: use `rd_kakfa_consumer_close_queue` in favour of `rd_kakfa_consumer_close` * create a new variable `KafkaClient.isConsumerClosed` that indicates if the poll loop needs to continue polling or if it can stop running * updated state management in `KafkaConsumer` to accomodate for polling when the `KafkaConsumer` is in the process of closing Result: Calling `KafkaClient.consumerClose` is not blocking anymore.
1 parent 2bd1639 commit 9e0f40d

File tree

2 files changed

+40
-16
lines changed

2 files changed

+40
-16
lines changed

Sources/SwiftKafka/KafkaClient.swift

Lines changed: 17 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -118,14 +118,6 @@ final class KafkaClient {
118118
}
119119
}
120120

121-
/// Close the consumer.
122-
func consumerClose() throws {
123-
let result = rd_kafka_consumer_close(self.kafkaHandle)
124-
if result != RD_KAFKA_RESP_ERR_NO_ERROR {
125-
throw KafkaError.rdKafkaError(wrapping: result)
126-
}
127-
}
128-
129121
/// Wraps a Swift closure inside of a class to be able to pass it to `librdkafka` as an `OpaquePointer`.
130122
/// This is specifically used to pass a Swift closure as a commit callback for the ``KafkaConsumer``.
131123
final class CapturedCommitCallback {
@@ -205,6 +197,23 @@ final class KafkaClient {
205197
}
206198
}
207199

200+
/// Close the consumer asynchronously. This means revoking its assignemnt, committing offsets to broker and
201+
/// leaving the consumer group (if applicable).
202+
///
203+
/// Make sure to run poll loop until ``KafkaClient/consumerIsClosed`` returns `true`.
204+
func consumerClose() throws {
205+
let consumerQueue = rd_kafka_queue_get_consumer(self.kafkaHandle)
206+
let result = rd_kafka_consumer_close_queue(self.kafkaHandle, consumerQueue)
207+
let kafkaError = rd_kafka_error_code(result)
208+
if kafkaError != RD_KAFKA_RESP_ERR_NO_ERROR {
209+
throw KafkaError.rdKafkaError(wrapping: kafkaError)
210+
}
211+
}
212+
213+
var isConsumerClosed: Bool {
214+
rd_kafka_consumer_closed(self.kafkaHandle) == 1
215+
}
216+
208217
/// Scoped accessor that enables safe access to the pointer of the client's Kafka handle.
209218
/// - Warning: Do not escape the pointer from the closure for later use.
210219
/// - Parameter body: The closure will use the Kafka handle pointer.

Sources/SwiftKafka/KafkaConsumer.swift

Lines changed: 23 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -201,16 +201,20 @@ public final class KafkaConsumer {
201201
switch nextAction {
202202
case .pollForAndYieldMessage(let client, let source):
203203
do {
204-
guard let message = try client.consumerPoll() else {
205-
break
204+
if let message = try client.consumerPoll() {
205+
// We do not support back pressure, we can ignore the yield result
206+
_ = source.yield(message)
206207
}
207-
// We do not support back pressure, we can ignore the yield result
208-
_ = source.yield(message)
209208
} catch {
210209
source.finish()
211210
throw error
212211
}
213212
try await Task.sleep(for: self.config.pollInterval)
213+
case .pollUntilClosed(let client):
214+
// Ignore poll result, we are closing down and just polling to commit
215+
// outstanding consumer state
216+
_ = try client.consumerPoll()
217+
try await Task.sleep(for: self.config.pollInterval)
214218
case .terminatePollLoop:
215219
return
216220
}
@@ -304,7 +308,9 @@ extension KafkaConsumer {
304308
source: Producer.Source
305309
)
306310
/// The ``KafkaConsumer`` has been closed.
307-
case finished
311+
///
312+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
313+
case finished(client: KafkaClient)
308314
}
309315

310316
/// The current state of the StateMachine.
@@ -335,6 +341,11 @@ extension KafkaConsumer {
335341
client: KafkaClient,
336342
source: Producer.Source
337343
)
344+
/// The ``KafkaConsumer`` is in the process of closing down, but still needs to poll
345+
/// to commit its state to the broker.
346+
///
347+
/// - Parameter client: Client used for handling the connection to the Kafka cluster.
348+
case pollUntilClosed(client: KafkaClient)
338349
/// Terminate the poll loop.
339350
case terminatePollLoop
340351
}
@@ -351,8 +362,12 @@ extension KafkaConsumer {
351362
fatalError("Subscribe to consumer group / assign to topic partition pair before reading messages")
352363
case .consuming(let client, let source):
353364
return .pollForAndYieldMessage(client: client, source: source)
354-
case .finished:
355-
return .terminatePollLoop
365+
case .finished(let client):
366+
if client.isConsumerClosed {
367+
return .terminatePollLoop
368+
} else {
369+
return .pollUntilClosed(client: client)
370+
}
356371
}
357372
}
358373

@@ -433,7 +448,7 @@ extension KafkaConsumer {
433448
case .initializing:
434449
fatalError("subscribe() / assign() should have been invoked before \(#function)")
435450
case .consuming(let client, let source):
436-
self.state = .finished
451+
self.state = .finished(client: client)
437452
return .shutdownGracefullyAndFinishSource(
438453
client: client,
439454
source: source

0 commit comments

Comments
 (0)