diff --git a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc index 67f2a48bc2..fe51ad526f 100644 --- a/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc +++ b/spring-kafka-docs/src/main/asciidoc/retrytopic.adoc @@ -148,6 +148,9 @@ public KafkaTemplate kafkaTemplate() { ---- ==== +IMPORTANT: Multiple `@KafkaListener` annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic. +It's best to use a single `RetryTopicConfiguration` bean for configuration of such topics; if multiple `@RetryableTopic` annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic's listeners and the other annotations' values will be ignored. + ==== Features Most of the features are available both for the `@RetryableTopic` annotation and the `RetryTopicConfiguration` beans. diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java index 5c5e943af3..4f7cceecdf 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -219,6 +219,16 @@ public long delay() { return this.delayMs; } + /** + * Return the number of partitions the + * retry topics should be created with. + * @return the number of partitions. + * @since 2.7.13 + */ + public int numPartitions() { + return this.numPartitions; + } + @Nullable public Boolean autoStartDltHandler() { return this.autoStartDltHandler; diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java index 1f39fc8cc4..4481ef5ca7 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2021 the original author or authors. + * Copyright 2018-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -20,6 +20,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.springframework.beans.factory.BeanFactory; import org.springframework.kafka.config.MethodKafkaListenerEndpoint; @@ -42,6 +43,8 @@ */ public class EndpointCustomizerFactory { + private static final int DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT = 0; + private final DestinationTopic.Properties destinationProperties; private final EndpointHandlerMethod beanMethod; @@ -50,7 +53,7 @@ public class EndpointCustomizerFactory { private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory; - EndpointCustomizerFactory(DestinationTopic.Properties destinationProperties, EndpointHandlerMethod beanMethod, + public EndpointCustomizerFactory(DestinationTopic.Properties destinationProperties, EndpointHandlerMethod beanMethod, BeanFactory beanFactory, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) { this.destinationProperties = destinationProperties; @@ -71,7 +74,14 @@ protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties pr Collection topics = customizeAndRegisterTopics(namesProvider, endpoint); endpoint.setId(namesProvider.getEndpointId(endpoint)); endpoint.setGroupId(namesProvider.getGroupId(endpoint)); - endpoint.setTopics(topics.stream().map(EndpointCustomizer.TopicNamesHolder::getCustomizedTopic).toArray(String[]::new)); + if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) { + endpoint.setTopicPartitions(getTopicPartitions(properties, namesProvider, + endpoint.getTopicPartitionsToAssign())); + } + else { + endpoint.setTopics(endpoint.getTopics().stream() + .map(namesProvider::getTopicName).toArray(String[]::new)); + } endpoint.setClientIdPrefix(namesProvider.getClientIdPrefix(endpoint)); endpoint.setGroup(namesProvider.getGroup(endpoint)); endpoint.setBean(bean); @@ -84,6 +94,29 @@ protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties pr }; } + private static TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties, + RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, + TopicPartitionOffset[] topicPartitionOffsets) { + return Stream.of(topicPartitionOffsets) + .map(tpo -> properties.isMainEndpoint() + ? getTPOForMainTopic(namesProvider, tpo) + : getTPOForRetryTopics(properties, namesProvider, tpo)) + .toArray(TopicPartitionOffset[]::new); + } + + private static TopicPartitionOffset getTPOForRetryTopics(DestinationTopic.Properties properties, RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset tpo) { + return new TopicPartitionOffset(namesProvider.getTopicName(tpo.getTopic()), + tpo.getPartition() <= properties.numPartitions() ? tpo.getPartition() : DEFAULT_PARTITION_FOR_MANUAL_ASSIGNMENT, + (Long) null); + } + + private static TopicPartitionOffset getTPOForMainTopic(RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, TopicPartitionOffset tpo) { + TopicPartitionOffset newTpo = new TopicPartitionOffset(namesProvider.getTopicName(tpo.getTopic()), + tpo.getPartition(), tpo.getOffset(), tpo.getPosition()); + newTpo.setRelativeToCurrent(tpo.isRelativeToCurrent()); + return newTpo; + } + protected Collection customizeAndRegisterTopics( RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider, MethodKafkaListenerEndpoint endpoint) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java index fd147a9a00..2ed64acecd 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/retrytopic/RetryTopicConfigurer.java @@ -341,10 +341,14 @@ private Consumer> getTopicCreationFunction(RetryTopicConfigur } protected void createNewTopicBeans(Collection topics, RetryTopicConfiguration.TopicCreation config) { - topics.forEach(topic -> - ((DefaultListableBeanFactory) this.beanFactory) - .registerSingleton(topic + "-topicRegistrationBean", - new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor())) + topics.forEach(topic -> { + DefaultListableBeanFactory bf = ((DefaultListableBeanFactory) this.beanFactory); + String beanName = topic + "-topicRegistrationBean"; + if (!bf.containsBean(beanName)) { + bf.registerSingleton(beanName, + new NewTopic(topic, config.getNumPartitions(), config.getReplicationFactor())); + } + } ); } diff --git a/spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java b/spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java index 70885e44c6..478b83f195 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/support/TopicPartitionOffset.java @@ -1,5 +1,5 @@ /* - * Copyright 2019-2020 the original author or authors. + * Copyright 2019-2022 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactoryTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactoryTests.java new file mode 100644 index 0000000000..7a8c93039d --- /dev/null +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactoryTests.java @@ -0,0 +1,223 @@ +/* + * Copyright 2022 the original author or authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.springframework.kafka.retrytopic; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.BDDMockito.given; + +import java.lang.reflect.Method; +import java.util.Arrays; +import java.util.List; +import java.util.function.Predicate; + +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import org.springframework.beans.factory.BeanFactory; +import org.springframework.kafka.config.MethodKafkaListenerEndpoint; +import org.springframework.kafka.support.EndpointHandlerMethod; +import org.springframework.kafka.support.TopicPartitionOffset; + +/** + * @author Tomaz Fernandes + * @since 2.8.5 + */ +@ExtendWith(MockitoExtension.class) +class EndpointCustomizerFactoryTests { + + @Mock + private DestinationTopic.Properties properties; + + @Mock + private EndpointHandlerMethod beanMethod; + + @Mock + private BeanFactory beanFactory; + + @Mock + private RetryTopicNamesProviderFactory retryTopicNamesProviderFactory; + + @Mock + private MethodKafkaListenerEndpoint endpoint; + + private final String[] topics = {"myTopic1", "myTopic2"}; + + private final Method method = EndpointCustomizerFactory.class.getDeclaredMethods()[0]; + + @Test + void shouldNotCustomizeEndpointForMainTopicWithTopics() { + + given(beanMethod.resolveBean(this.beanFactory)).willReturn(method); + given(endpoint.getTopics()).willReturn(Arrays.asList(topics)); + given(properties.suffix()).willReturn(""); + RetryTopicNamesProviderFactory.RetryTopicNamesProvider provider = + new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties); + given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider); + + EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod, + beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer(); + + List holders = + (List) endpointCustomizer.customizeEndpointAndCollectTopics(endpoint); + + assertThat(holders).hasSize(2).element(0) + .matches(assertMainTopic(0)); + assertThat(holders).element(1) + .matches(assertMainTopic(1)); + + } + + @Test + void shouldNotCustomizeEndpointForMainTopicWithTPO() { + + given(beanMethod.resolveBean(this.beanFactory)).willReturn(method); + given(properties.isMainEndpoint()).willReturn(true); + given(properties.suffix()).willReturn(""); + RetryTopicNamesProviderFactory.RetryTopicNamesProvider provider = + new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties); + given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider); + + String testString = "testString"; + MethodKafkaListenerEndpoint endpointTPO = new MethodKafkaListenerEndpoint<>(); + endpointTPO.setTopicPartitions(new TopicPartitionOffset(topics[0], 0, 0L), + new TopicPartitionOffset(topics[1], 1, 1L)); + endpointTPO.setMethod(this.method); + endpointTPO.setId(testString); + endpointTPO.setClientIdPrefix(testString); + endpointTPO.setGroup(testString); + + EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod, + beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer(); + + List holders = + (List) endpointCustomizer.customizeEndpointAndCollectTopics(endpointTPO); + + assertThat(holders).hasSize(2).element(0) + .matches(assertMainTopic(0)); + assertThat(holders).element(1) + .matches(assertMainTopic(1)); + + assertThat(endpointTPO.getTopics()) + .isEmpty(); + + TopicPartitionOffset[] topicPartitionsToAssign = endpointTPO.getTopicPartitionsToAssign(); + assertThat(topicPartitionsToAssign).hasSize(2); + assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[0], + new TopicPartitionOffset(topics[0], 0, 0L))).isTrue(); + assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[1], + new TopicPartitionOffset(topics[1], 1, 1L))).isTrue(); + + } + + private Predicate assertMainTopic(int index) { + return holder -> holder.getCustomizedTopic().equals(topics[index]) + && holder.getMainTopic().equals(topics[index]); + } + + @Test + void shouldCustomizeEndpointForRetryTopic() { + + MethodKafkaListenerEndpoint endpoint = new MethodKafkaListenerEndpoint<>(); + String testString = "testString"; + endpoint.setTopics(this.topics); + endpoint.setMethod(this.method); + endpoint.setId(testString); + endpoint.setClientIdPrefix(testString); + endpoint.setGroup(testString); + + MethodKafkaListenerEndpoint endpointTPO = new MethodKafkaListenerEndpoint<>(); + endpointTPO.setTopicPartitions(new TopicPartitionOffset(topics[0], 0, 0L), + new TopicPartitionOffset(topics[1], 1, 1L)); + endpointTPO.setMethod(this.method); + endpointTPO.setId(testString); + endpointTPO.setClientIdPrefix(testString); + endpointTPO.setGroup(testString); + + String suffix = "-retry"; + given(beanMethod.resolveBean(this.beanFactory)).willReturn(method); + given(properties.isMainEndpoint()).willReturn(false); + given(properties.suffix()).willReturn(suffix); + given(properties.numPartitions()).willReturn(2); + + RetryTopicNamesProviderFactory.RetryTopicNamesProvider provider = + new SuffixingRetryTopicNamesProviderFactory().createRetryTopicNamesProvider(properties); + given(retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties)).willReturn(provider); + + EndpointCustomizer endpointCustomizer = new EndpointCustomizerFactory(properties, beanMethod, + beanFactory, retryTopicNamesProviderFactory).createEndpointCustomizer(); + + List holders = + (List) endpointCustomizer.customizeEndpointAndCollectTopics(endpoint); + + String topic1WithSuffix = topics[0] + suffix; + String topic2WithSuffix = topics[1] + suffix; + assertThat(holders).hasSize(2).element(0) + .matches(holder -> holder.getMainTopic().equals(topics[0]) + && holder.getCustomizedTopic().equals(topic1WithSuffix)); + assertThat(holders).hasSize(2).element(1) + .matches(holder -> holder.getMainTopic().equals(topics[1]) + && holder.getCustomizedTopic().equals(topic2WithSuffix)); + + String testStringSuffix = testString + suffix; + + assertThat(endpoint.getTopics()) + .contains(topic1WithSuffix, topic2WithSuffix); + assertThat(endpoint.getId()) + .isEqualTo(testStringSuffix); + assertThat(endpoint.getClientIdPrefix()) + .isEqualTo(testStringSuffix); + assertThat(endpoint.getGroup()) + .isEqualTo(testStringSuffix); + assertThat(endpoint.getTopicPartitionsToAssign()).isEmpty(); + + List holdersTPO = + (List) endpointCustomizer.customizeEndpointAndCollectTopics(endpointTPO); + + assertThat(holdersTPO).hasSize(2).element(0) + .matches(holder -> holder.getMainTopic().equals(topics[0]) + && holder.getCustomizedTopic().equals(topic1WithSuffix)); + assertThat(holdersTPO).hasSize(2).element(1) + .matches(holder -> holder.getMainTopic().equals(topics[1]) + && holder.getCustomizedTopic().equals(topic2WithSuffix)); + + assertThat(endpointTPO.getTopics()) + .isEmpty(); + + TopicPartitionOffset[] topicPartitionsToAssign = endpointTPO.getTopicPartitionsToAssign(); + assertThat(topicPartitionsToAssign).hasSize(2); + assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[0], + new TopicPartitionOffset(topic1WithSuffix, 0, (Long) null))).isTrue(); + assertThat(equalsTopicPartitionOffset(topicPartitionsToAssign[1], + new TopicPartitionOffset(topic2WithSuffix, 1, (Long) null))).isTrue(); + + assertThat(endpointTPO.getId()) + .isEqualTo(testStringSuffix); + assertThat(endpointTPO.getClientIdPrefix()) + .isEqualTo(testStringSuffix); + assertThat(endpointTPO.getGroup()) + .isEqualTo(testStringSuffix); + } + + private boolean equalsTopicPartitionOffset(TopicPartitionOffset tpo1, TopicPartitionOffset tpo2) { + return tpo1.getTopicPartition().equals(tpo2.getTopicPartition()) && + ((tpo1.getOffset() == null && tpo2.getOffset() == null) || + (tpo1.getOffset() != null && tpo1.getOffset().equals(tpo2.getOffset()))); + + } +} diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java index b0b45db4fe..39872a5a3e 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java @@ -24,7 +24,6 @@ import static org.mockito.BDDMockito.then; import static org.mockito.BDDMockito.willReturn; import static org.mockito.BDDMockito.willThrow; -import static org.mockito.Mockito.never; import static org.mockito.Mockito.times; import java.lang.reflect.Method; @@ -53,6 +52,7 @@ import org.springframework.kafka.config.MethodKafkaListenerEndpoint; import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint; import org.springframework.kafka.support.EndpointHandlerMethod; +import org.springframework.kafka.test.condition.LogLevels; import org.springframework.test.util.ReflectionTestUtils; /** @@ -119,7 +119,8 @@ class RetryTopicConfigurerTests { @Mock private ListenerContainerFactoryConfigurer.Configuration lcfcConfiguration; - private static final Object objectMessage = new Object(); + @Mock + private Object objectMessage; private static final List topics = Arrays.asList("topic1", "topic2"); @@ -356,13 +357,14 @@ void shouldInstantiateIfNotInContainer() { } + @LogLevels(classes = RetryTopicConfigurer.class, level = "info") @Test @SuppressWarnings("deprecation") void shouldLogConsumerRecordMessage() { RetryTopicConfigurer.LoggingDltListenerHandlerMethod method = new RetryTopicConfigurer.LoggingDltListenerHandlerMethod(); method.logMessage(consumerRecordMessage); - then(consumerRecordMessage).should(never()).topic(); + then(consumerRecordMessage).should().topic(); } @Test @@ -370,6 +372,7 @@ void shouldNotLogObjectMessage() { RetryTopicConfigurer.LoggingDltListenerHandlerMethod method = new RetryTopicConfigurer.LoggingDltListenerHandlerMethod(); method.logMessage(objectMessage); + then(objectMessage).shouldHaveNoInteractions(); } static class NoOpsClass { diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java index 535ea85ae8..9931fd60a0 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicIntegrationTests.java @@ -45,7 +45,9 @@ import org.springframework.kafka.annotation.DltHandler; import org.springframework.kafka.annotation.EnableKafka; import org.springframework.kafka.annotation.KafkaListener; +import org.springframework.kafka.annotation.PartitionOffset; import org.springframework.kafka.annotation.RetryableTopic; +import org.springframework.kafka.annotation.TopicPartition; import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory; import org.springframework.kafka.core.ConsumerFactory; import org.springframework.kafka.core.DefaultKafkaConsumerFactory; @@ -81,7 +83,8 @@ @EmbeddedKafka(topics = { RetryTopicIntegrationTests.FIRST_TOPIC, RetryTopicIntegrationTests.SECOND_TOPIC, RetryTopicIntegrationTests.THIRD_TOPIC, - RetryTopicIntegrationTests.FOURTH_TOPIC }, partitions = 1) + RetryTopicIntegrationTests.FOURTH_TOPIC, + RetryTopicIntegrationTests.FIFTH_TOPIC }) @TestPropertySource(properties = "five.attempts=5") public class RetryTopicIntegrationTests { @@ -95,6 +98,8 @@ public class RetryTopicIntegrationTests { public final static String FOURTH_TOPIC = "myRetryTopic4"; + public final static String FIFTH_TOPIC = "myRetryTopic5"; + public final static String NOT_RETRYABLE_EXCEPTION_TOPIC = "noRetryTopic"; private final static String MAIN_TOPIC_CONTAINER_FACTORY = "kafkaListenerContainerFactory"; @@ -138,6 +143,17 @@ void shouldRetryFourthTopicWithNoDlt() { assertThat(awaitLatch(latchContainer.countDownLatch4)).isTrue(); } + @Test + void shouldRetryFifthTopicWithTwoListenersAndManualAssignment() { + logger.debug("Sending two messages to topic " + FIFTH_TOPIC); + kafkaTemplate.send(FIFTH_TOPIC, 0, "0", "Testing topic 5 - 0"); + kafkaTemplate.send(FIFTH_TOPIC, 1, "0", "Testing topic 5 - 1"); + assertThat(awaitLatch(latchContainer.countDownLatch51)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatch52)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchDltThree)).isTrue(); + assertThat(awaitLatch(latchContainer.countDownLatchDltFour)).isTrue(); + } + @Test public void shouldGoStraightToDlt() { logger.debug("Sending message to topic " + NOT_RETRYABLE_EXCEPTION_TOPIC); @@ -166,7 +182,7 @@ static class FirstTopicListener { errorHandler = "myCustomErrorHandler", contentTypeConverter = "myCustomMessageConverter") public void listen(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { logger.debug("Message {} received in topic {}", message, receivedTopic); - container.countDownLatch1.countDown(); + container.countDownLatch1.countDown(); throw new RuntimeException("Woooops... in topic " + receivedTopic); } } @@ -231,6 +247,57 @@ public void shouldNotGetHere() { } } + static class FifthTopicListener1 { + + @Autowired + CountDownLatchContainer container; + + @RetryableTopic(attempts = "4", + backoff = @Backoff(250), + numPartitions = "2", + kafkaTemplate = "kafkaTemplate") + @KafkaListener(id = "fifthTopicId1", topicPartitions = {@TopicPartition(topic = FIFTH_TOPIC, + partitionOffsets = @PartitionOffset(partition = "0", initialOffset = "0"))}, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + public void listenWithAnnotation(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + container.countDownIfNotKnown(receivedTopic, container.countDownLatch51); + logger.debug("Message {} received in annotated topic {} ", message, receivedTopic); + throw new RuntimeException("Annotated woooops... " + receivedTopic); + } + + @DltHandler + public void annotatedDltMethod(Object message) { + logger.debug("Received message in annotated Dlt method"); + container.countDownLatchDltThree.countDown(); + } + } + + static class FifthTopicListener2 { + + @Autowired + CountDownLatchContainer container; + + @RetryableTopic(attempts = "4", + backoff = @Backoff(250), + numPartitions = "2", + kafkaTemplate = "kafkaTemplate") + @KafkaListener(id = "fifthTopicId2", topicPartitions = {@TopicPartition(topic = FIFTH_TOPIC, + partitionOffsets = @PartitionOffset(partition = "1", initialOffset = "0"))}, + containerFactory = MAIN_TOPIC_CONTAINER_FACTORY) + public void listenWithAnnotation2(String message, @Header(KafkaHeaders.RECEIVED_TOPIC) String receivedTopic) { + container.countDownLatch52.countDown(); + logger.debug("Message {} received in annotated topic {} ", message, receivedTopic); + throw new RuntimeException("Annotated woooops... " + receivedTopic); + } + + @DltHandler + public void annotatedDltMethod(Object message) { + logger.debug("Received message in annotated Dlt method"); + container.countDownLatchDltFour.countDown(); + } + + } + @Component static class NoRetryTopicListener { @@ -261,19 +328,25 @@ static class CountDownLatchContainer { CountDownLatch countDownLatch2 = new CountDownLatch(3); CountDownLatch countDownLatch3 = new CountDownLatch(3); CountDownLatch countDownLatch4 = new CountDownLatch(4); + CountDownLatch countDownLatch51 = new CountDownLatch(4); + CountDownLatch countDownLatch52 = new CountDownLatch(3); CountDownLatch countDownLatchNoRetry = new CountDownLatch(1); CountDownLatch countDownLatchDltOne = new CountDownLatch(1); CountDownLatch countDownLatchDltTwo = new CountDownLatch(1); + CountDownLatch countDownLatchDltThree = new CountDownLatch(1); + CountDownLatch countDownLatchDltFour = new CountDownLatch(1); CountDownLatch customDltCountdownLatch = new CountDownLatch(1); CountDownLatch customErrorHandlerCountdownLatch = new CountDownLatch(6); CountDownLatch customMessageConverterCountdownLatch = new CountDownLatch(6); - List knownTopics = new ArrayList<>(); + final List knownTopics = new ArrayList<>(); private void countDownIfNotKnown(String receivedTopic, CountDownLatch countDownLatch) { - if (!knownTopics.contains(receivedTopic)) { - knownTopics.add(receivedTopic); - countDownLatch.countDown(); + synchronized (knownTopics) { + if (!knownTopics.contains(receivedTopic)) { + knownTopics.add(receivedTopic); + countDownLatch.countDown(); + } } } } @@ -380,6 +453,16 @@ public FourthTopicListener fourthTopicListener() { return new FourthTopicListener(); } + @Bean + public FifthTopicListener1 fifthTopicListener1() { + return new FifthTopicListener1(); + } + + @Bean + public FifthTopicListener2 fifthTopicListener2() { + return new FifthTopicListener2(); + } + @Bean public NoRetryTopicListener noRetryTopicListener() { return new NoRetryTopicListener();