-
Notifications
You must be signed in to change notification settings - Fork 30
Fix: KafkaClient.closeConsumer
should not block
#73
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix: KafkaClient.closeConsumer
should not block
#73
Conversation
5b17640
to
9e0f40d
Compare
guard result == RD_KAFKA_RESP_ERR_NO_ERROR else { | ||
throw KafkaError.rdKafkaError(wrapping: result) | ||
} | ||
try await client.commitSync(message) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The underlying APIs don't support cancellation. Let's call this out in the docs of this method.
case finished | ||
/// | ||
/// - Parameter client: Client used for handling the connection to the Kafka cluster. | ||
case finished(client: KafkaClient) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That is a bit weird. We should make sure that the end state actually doesn't retain the client so that we can deinit it. Maybe we need a new state here.
Motivation: Currently our invocation to `rd_kafka_commit` inside of `KafkaCosumer.commitSync` is blocking a cooperative thread. This PR aims to make `KafkaCosumer.commitSync` non-blocking by using the callback-based commit API. Modifications: * move `commitSync` logic to `KafkaClient` * replace the blocking invocation to [rd_kafka_commit](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#ab96539928328f14c3c9177ea0c896c87) with a callback-based invocation to [rd_kafka_commit_queue](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#af76a6a73baa9c2621536e3f6882a3c1a) which is then wrapped inside a `withAsyncThrowingContinuation` statement
Motivation: [rd_kakfa_consumer_close](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a37b54d329e12d745889defe96e7d043d) was blocking. This PR proposes using the [rd_kakfa_consumer_close_queue](https://docs.confluent.io/platform/current/clients/librdkafka/html/rdkafka_8h.html#a9dd5c18bdfed81c8847b259f0a8d498d) API which is non-blocking and served through the normal poll loop. We now Modifications: * `KafkaClient.consumerClose`: use `rd_kakfa_consumer_close_queue` in favour of `rd_kakfa_consumer_close` * create a new variable `KafkaClient.isConsumerClosed` that indicates if the poll loop needs to continue polling or if it can stop running * updated state management in `KafkaConsumer` to accomodate for polling when the `KafkaConsumer` is in the process of closing Result: Calling `KafkaClient.consumerClose` is not blocking anymore.
9e0f40d
to
dd902ba
Compare
Modifications: * introduce new `KafkaConsumer.StateMachine.State` `.finishing` to avoid retaining `client` in state `.finished` * rename `KafkaConsumer.shutdownGracefully` to `KafkaConsumer.triggerGracefulShutdown` * add note that `KafkaConsumer.commitSync` does not support `Task` cancellation
a0cb8d8
to
af22ded
Compare
Motivation:
rd_kakfa_consumer_close
was blocking. This PR proposes using the
rd_kakfa_consumer_close_queue
API which is non-blocking and served through the normal poll loop.
Modifications:
KafkaClient.consumerClose
: userd_kakfa_consumer_close_queue
infavour of
rd_kakfa_consumer_close
KafkaClient.isConsumerClosed
that indicatesif the poll loop needs to continue polling or if it can stop running
KafkaConsumer
to accomodate for pollingwhen the
KafkaConsumer
is in the process of closingResult:
Calling
KafkaClient.consumerClose
is not blocking anymore