-
Notifications
You must be signed in to change notification settings - Fork 2.4k
Add ability to start reading from a custom offset in KafkaItemReader #737
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Hi @callard71 , Thank you for this PR. Two tests are failing with this change set ( The code in this PR initializes the partition offsets to an empty map (to rely on the offset stored in Kafka) which is a good point but removes the default initialization that makes the reader start from the beginning. We need to keep the default behaviour (read from the beginning) but offer the possibility to start from a given offset (a custom one or the one stored in Kafka). We can do this by providing a setter for /**
* Setter for partition topics. This mapping tells the reader the offset to start
* reading from in each partition. This is optional, defaults to starting from
* offset 0 in each partition. Passing an empty map makes the reader start
* from the offset stored in Kafka for the consumer group ID.
*
* @param partitionOffsets mapping of starting offset in each partition
*/
public void setPartitionOffsets(Map<TopicPartition, Long> partitionOffsets) {
this.partitionOffsets = partitionOffsets;
} This behaviour is consistent with all readers that inherit from For consistency, I suggest to make the kafka item reader configurable in the same way and using the same default. The initialization code could be updated to something like this: --this.partitionOffsets = new HashMap<>();
--for (TopicPartition topicPartition : this.topicPartitions) {
-- this.partitionOffsets.put(topicPartition, 0L);
--}
++if (this.partitionOffsets == null) {
++ this.partitionOffsets = new HashMap<>();
++ for (TopicPartition topicPartition : this.topicPartitions) {
++ this.partitionOffsets.put(topicPartition, 0L);
++ }
++} This change is backward compatible and allows to:
What do you think? If you agree, please update the PR accordingly and it should be good to merge. Otherwise please let me know and I can apply the change if you agree. |
Hi @callard71 , Did you get a chance to review my previous comment? If you agree on the changes, I can take care of updating the code accordingly. Looking forward to your feedback. |
Hi, It's a good catch. Yes I agree, since I'm no more setup I will let you do the small changes. Thanks again ! |
Hi @callard71 , Thank you for your feedback. I applied the changes as discussed in 15a393b. |
Hi,
The problem is that the reader overrides the fetch offsets even when we don't want to rely on the saved state from the execution context.
By removing the initialization of the partition offsets list, the customer relies on the values stored in the broker.
If the topic has not been read yet, the customer can now use the "auto.offset.reset" configuration to apply the corresponding behavior.