diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java index ca889f70d4..30471a0015 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/KafkaMessageListenerContainer.java @@ -2447,7 +2447,7 @@ private void handleNack(final ConsumerRecords records, final ConsumerRecor Iterator> iterator2 = records.iterator(); while (iterator2.hasNext()) { ConsumerRecord next = iterator2.next(); - if (next.equals(record) || list.size() > 0) { + if (list.size() > 0 || recordsEqual(record, next)) { list.add(next); } } @@ -2455,6 +2455,12 @@ private void handleNack(final ConsumerRecords records, final ConsumerRecor pauseForNackSleep(); } + private boolean recordsEqual(ConsumerRecord rec1, ConsumerRecord rec2) { + return rec1.topic().equals(rec2.topic()) + && rec1.partition() == rec2.partition() + && rec1.offset() == rec2.offset(); + } + private void pauseForNackSleep() { if (this.nackSleep > 0) { this.nackWake = System.currentTimeMillis() + this.nackSleep; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java index 1fa2b91bc5..6e9a210543 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/listener/RecordInterceptor.java @@ -37,7 +37,9 @@ public interface RecordInterceptor extends ThreadStateProcessor { /** * Perform some action on the record or return a different one. If null is returned - * the record will be skipped. Invoked before the listener. + * the record will be skipped. Invoked before the listener. IMPORTANT; if this method + * returns a different record, the topic, partition and offset must not be changed + * to avoid undesirable side-effects. * @param record the record. * @param consumer the consumer. * @return the record or null. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java index 688c8504da..9c23d2321d 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/listener/ManualNackRecordTests.java @@ -61,6 +61,7 @@ import org.springframework.kafka.listener.ContainerProperties.AckMode; import org.springframework.kafka.support.Acknowledgment; import org.springframework.kafka.test.utils.KafkaTestUtils; +import org.springframework.lang.Nullable; import org.springframework.test.annotation.DirtiesContext; import org.springframework.test.context.junit.jupiter.SpringJUnitConfig; @@ -243,6 +244,17 @@ public ConcurrentKafkaListenerContainerFactory kafkaListenerContainerFactory() { factory.setConsumerFactory(consumerFactory()); factory.getContainerProperties().setAckMode(AckMode.MANUAL); factory.getContainerProperties().setMissingTopicsFatal(false); + factory.setRecordInterceptor(new RecordInterceptor() { + + @Override + @Nullable + public ConsumerRecord intercept(ConsumerRecord record, Consumer consumer) { + return new ConsumerRecord(record.topic(), record.partition(), record.offset(), 0L, + TimestampType.NO_TIMESTAMP_TYPE, 0, 0, record.key(), record.value(), record.headers(), + Optional.empty()); + } + + }); return factory; }