Skip to content

How to start the KafkaItemReader from the offset of before run? #3795

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
vishnu2497 opened this issue Oct 21, 2020 · 1 comment
Closed

How to start the KafkaItemReader from the offset of before run? #3795

vishnu2497 opened this issue Oct 21, 2020 · 1 comment
Labels
status: duplicate Issues that are duplicates of other issues

Comments

@vishnu2497
Copy link

Bug description
While using KafkaItemReader to read the data from Kafka it always starts from 0th offset,if the previously commited offset is 1000 but it is starting from 0 always

Environment
Springboot-2.3.4.RELEASE
SpringBatch-2.3.4.RELEASE
java -Amazon coretto 11

Steps to reproduce
keep this property in kafka itemreader and try to consume two times

KafkaItemReader<String, Member> memberKafkaItemReader() {
Properties props = new Properties();
props.put(JsonSerializer.ADD_TYPE_INFO_HEADERS, false);
props.putAll(this.memberProperties.buildConsumerProperties());
props.put(ConsumerConfig.GROUP_ID_CONFIG, appProperties.MEMBER_GROUP_ID);
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, appProperties.BOOTSTRAP_SERVERS_CONFIG);
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
props.put(JsonDeserializer.VALUE_DEFAULT_TYPE, "com.sample.consumer.test");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "100");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put("auto.commit.interval.ms","500");

    return new KafkaItemReaderBuilder<String, Member>()
            .partitions(0)
            .consumerProperties(props)
            .name("test-reader")
            .saveState(true)
            .topic(appProperties.MEMBER_CONSUME_TOPIC)
            .build();

Expected behavior

In first Run it has to consume from offset 0
in second run it has to continue from the last run offset.

problem
But every time it is starting from the 0th offset

@vishnu2497 vishnu2497 added status: waiting-for-triage Issues that we did not analyse yet type: bug labels Oct 21, 2020
@fmbenhassine
Copy link
Contributor

This was addressed in #737 and released in v4.3.0 (See KafkaItemReaderBuilder#partitionOffsets).

@fmbenhassine fmbenhassine added status: duplicate Issues that are duplicates of other issues and removed status: waiting-for-triage Issues that we did not analyse yet type: bug labels Nov 5, 2020
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
status: duplicate Issues that are duplicates of other issues
Projects
None yet
Development

No branches or pull requests

2 participants