-
Notifications
You must be signed in to change notification settings - Fork 30
KafkaConsumer
Refactoring
#66
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
KafkaConsumer
Refactoring
#66
Conversation
b6aa400
to
0677dcf
Compare
Sources/SwiftKafka/KafkaClient.swift
Outdated
/// - Parameter timeout: Maximum amount of milliseconds this method waits for a new message. | ||
/// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages. | ||
/// - Throws: A ``KafkaError`` if the received message is an error message or malformed. | ||
func consumerPoll(timeout: Int32) async throws -> KafkaConsumerMessage? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should remove the timeout
here otherwise people might actually block. Additionally, we should remove the async
from this method. I don't see a reason why we are creating a continuation here. We should be able to just call rd_kafka_consumer_poll
and bridge the message into a Swift type, destroy the pointer and return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Totally makes sense!
Modifications: * `KafkaConsumer`: remove `NIOAsyncSequenceProducer` and use own implementation of `AsyncSequence` instead * write async wrapper method `KafkaClient.consumerPoll(timeout:)` * update memory leak tests for `KafkaConsumer` and `KafkaProducer` * update tests
Modifications: * replace `serialQueue` in `KafkaConsumer` with `StateMachine` that encapsulates all variables and can also be accessed from the `ConsumerMessagesAsyncSequence` * close `KafkaConsumer` when `for await` loop of `AsyncSequence` is exited * make `ConsumerMessagesAsyncIterator` a class-backed `struct` * make `KafkaConsumer.shutdownGracefully` `public`
Modifications: * add `run()` method to `KafkaConsumer` * rename `ConsumerMessagesAsyncSequence` to `KafkaConsumerMessages` * `KafkaConsumer`: put back `NIOAsyncSequenceProducer` * update `README` * use `Duration` type for `pollInterval`
Modifications: * `KafkaClient.consumerPoll`: remove `timeout` as this method should only be used with `timeout = 0` * `KafkaClient.consumerPoll`: remove continuation as it has no benefit
0677dcf
to
fa9ebf4
Compare
/// | ||
/// - Note: Invoking this function is not always needed as the ``KafkaConsumer`` | ||
/// will already shut down when consumption of the ``KafkaConsumerMessages`` has ended. | ||
public func shutdownGracefully() { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should make this method private and since we intend to use service lifecycle to do this in the run()
method.
let result = client.withKafkaHandlePointer { handle in | ||
rd_kafka_subscribe(handle, subscribedTopicsPointer) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For all these places where we use withKafkaHandlePointer
can we just add a method on the KafkaClient
instead? At best that method also already does the conversion to the correct error.
} | ||
messageResult = .success(message) | ||
} catch let kafkaError as KafkaError { | ||
messageResult = .failure(kafkaError) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we fail polling shouldn't we shutdown everything?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So consumerPoll
throws
when there is an error with the received message. However, I think we shouldn't close the entire consumer because of, e.g. a message with a faulty payload.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I think this is pretty bad. If we get a bad message something seriously went wrong. At least that is what I expect. Can you investigate when this could happen and document it here. IMO it is weird that our sequence contains a Result<>
, I would like to get rid of that.
do { | ||
try self._commitSync(message) | ||
try self._commitSync(message) // Blocks until commiting the offset is done |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah that is problematic. We need to change that.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
client: KafkaClient, | ||
source: Producer.Source, | ||
subscribedTopicsPointer: UnsafeMutablePointer<rd_kafka_topic_partition_list_t>, | ||
logger: Logger |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's not store the logger in here. We can just keep it in a separate prop since it is not state.
pollInterval: Duration, | ||
client: KafkaClient, | ||
source: Producer.Source, | ||
subscribedTopicsPointer: UnsafeMutablePointer<rd_kafka_topic_partition_list_t>, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We should create an abstraction for this so we don't hold UnsafeMutablePointer
s all the time.
/// - Parameter subscribedTopicsPointer: Pointer to a list of topics + partition pairs. | ||
/// - Parameter logger: A logger. | ||
case initializing( | ||
pollInterval: Duration, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Doesn't need to be in the state machine since it is a fixed configuration.
/// Kill the poll loop. | ||
case killPollLoop |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Kill is a bit harsh. Can we call this terminate
instead?
Modifications: * make `KafkaConsumer.shutdownGracefully()` `private` * `KafkaConsumer`: create `librdkafka` wrapping methods in `KafkaClient` instead of using `KafkaClient.withKafkaHandlePointer` * `KafkaConsumer`: rename `.killPollLoop` -> `.terminatePollLoop` * `KafkaConsumer.StateMachine`: move `logger` out of `State` * `KafkaConsumer`: move `pollInterval` out of `StateMachine`
Modifications: * create new class `RDKafkaTopicPartitionList` wrapping a `rd_kafka_topic_partition_list_t`
ShutdownOnTerminate | ||
> | ||
/// Time between two consecutive polls. | ||
private var pollInterval: Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this not part of the KafkaConsumerConfiguration
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wanted to do that in a separate PR as this should also be done for KafkaProducer
, but yeah I can do it in this one already
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Separate PR is fine
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Too late now 😅
} | ||
messageResult = .success(message) | ||
} catch let kafkaError as KafkaError { | ||
messageResult = .failure(kafkaError) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm, I think this is pretty bad. If we get a bad message something seriously went wrong. At least that is what I expect. Can you investigate when this could happen and document it here. IMO it is weird that our sequence contains a Result<>
, I would like to get rid of that.
import Crdkafka | ||
|
||
/// Swift wrapper type for `rd_kafka_topic_partition_list_t`. | ||
class RDKafkaTopicPartitionList { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class RDKafkaTopicPartitionList { | |
final class RDKafkaTopicPartitionList { |
Modifications: * make `RDKafkaTopicPartitionList` a `final class` * make `KafkaConsumer.messages` `AsyncSequence` return `KafkaConsumerMessage` instead of `Result<,>` type * `KafkaConsumer` fails immediately when `rd_kafka_message_t` with error payload is received * move `KafkaConsumer.pollInterval` to `KafkaConsumerConfiguration` * move `KafkaProducer.pollInterval` to `KafkaProducerConfiguration` * update `README`
Motivation
Our
KafkaConsumer
should expose arun()
method that serves the consumer queue and pollslibrdkafka
for new messages and any other queued callbacks. This aligns with the new implementation ofKafkaProducer
.Additionally
weak
reference toself
inKafkaConsumer
DispatchQueue
inKafkaConsumer
Modifications
KafkaConsumer.run()
KafkaClient.consumerPoll(timeout:)
serialQueue
inKafkaConsumer
withStateMachine
thatencapsulates all variables and can also be accessed from the
ConsumerMessagesAsyncSequence
KafkaConsumer
whenfor await
loop ofAsyncSequence
isexited
KafkaConsumer.shutdownGracefully
public
BackPressureStrategy
fromKafkaConsumerConfiguration