-
Notifications
You must be signed in to change notification settings - Fork 1.7k
Description
In what version(s) of Spring for Apache Kafka are you seeing this issue?
Tested on v3.0.8, previous versions are affected as well
Describe the bug
We are using JpaTransactionManager
in ConcurrentKafkaListenerContainerFactory
to start new DB transactions in consumers.
We've noticed that spring-kafka
commits offset despite the error during the commit of the DB transaction.
This behavior is unexpected because an error has occurred and retries have been conducted as well.
Therefore, we are expecting that the offset will not be committed and it should work in the same way as when the consumer throws an exception.
Retries are working fine
To Reproduce
Create a container factory and use JpaTransactionManager
there
ConcurrentKafkaListenerContainerFactory<Object, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.getContainerProperties().setTransactionManager(jpaTransactionManager);
and consumer
// to propagate transaction from spring-kafka to be able to rollback it
@Transactional
@KafkaListener(topics = ROLLING_BACK_TRANSACTIONAL_TOPIC)
public void rollingBackTransactionalReceiver(SampleMessage message) {
// the data will not be saved into DB
persist(message);
TransactionAspectSupport.currentTransactionStatus().setRollbackOnly();
}
Expected behavior
Offset is not committed into the consumer group because of an error during the commit of the DB transaction