From 304a1bf80ac321e5e74963fda1d651aca9e2e58a Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Fri, 8 Mar 2024 15:17:28 +0800 Subject: [PATCH 1/3] * `@RetryableTopic` support annotated on Class. * Support process `@RetryableTopic` from Class in `RetryTopicConfigurationProvider`. * Support process `@DltHandler` when `@RetryableTopic` annotated on the Class in `RetryableTopicAnnotationProcessor`. part of #3105 --- .../RetryTopicConfigurationProvider.java | 33 +++- .../kafka/annotation/RetryableTopic.java | 2 +- .../RetryableTopicAnnotationProcessor.java | 16 +- .../RetryTopicConfigurationProviderTests.java | 41 ++++- ...etryableTopicAnnotationProcessorTests.java | 142 +++++++++++++----- 5 files changed, 176 insertions(+), 58 deletions(-) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java index 230c41c61d..63e603e4d9 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java @@ -1,5 +1,5 @@ /* - * Copyright 2018-2023 the original author or authors. + * Copyright 2018-2024 the original author or authors. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,8 +16,10 @@ package org.springframework.kafka.annotation; +import java.lang.reflect.AnnotatedElement; import java.lang.reflect.Method; import java.util.Map; +import java.util.Objects; import org.apache.commons.logging.LogFactory; @@ -53,6 +55,8 @@ * * @author Tomaz Fernandes * @author Gary Russell + * @author Wang Zhiyang + * * @since 2.7 * @see org.springframework.kafka.retrytopic.RetryTopicConfigurer * @see RetryableTopic @@ -96,19 +100,34 @@ public RetryTopicConfigurationProvider(@Nullable BeanFactory beanFactory, @Nulla this.resolver = resolver; this.expressionContext = expressionContext; } + @Nullable public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method method, Object bean) { - RetryableTopic annotation = MergedAnnotations.from(method, SearchStrategy.TYPE_HIERARCHY, - RepeatableContainers.none()) - .get(RetryableTopic.class) - .synthesize(MergedAnnotation::isPresent) - .orElse(null); + return findRetryConfigurationFor(topics, method, null, bean); + } + + @Nullable + public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, @Nullable Method method, + @Nullable Class clazz, Object bean) { + + RetryableTopic annotation = getRetryableTopicAnnotationFromAnnotatedElement( + Objects.requireNonNullElse(method, clazz)); + Class declaringClass = method != null ? method.getDeclaringClass() : clazz; return annotation != null ? new RetryableTopicAnnotationProcessor(this.beanFactory, this.resolver, this.expressionContext) - .processAnnotation(topics, method, annotation, bean) + .processAnnotation(topics, declaringClass, annotation, bean) : maybeGetFromContext(topics); } + @Nullable + private RetryableTopic getRetryableTopicAnnotationFromAnnotatedElement(AnnotatedElement element) { + return MergedAnnotations.from(element, SearchStrategy.TYPE_HIERARCHY, + RepeatableContainers.none()) + .get(RetryableTopic.class) + .synthesize(MergedAnnotation::isPresent) + .orElse(null); + } + @Nullable private RetryTopicConfiguration maybeGetFromContext(String[] topics) { if (this.beanFactory == null || !ListableBeanFactory.class.isAssignableFrom(this.beanFactory.getClass())) { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java index f66a61f6e5..b90c611acb 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java @@ -47,7 +47,7 @@ * * @see org.springframework.kafka.retrytopic.RetryTopicConfigurer */ -@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE }) +@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE, ElementType.TYPE }) @Retention(RetentionPolicy.RUNTIME) @Documented public @interface RetryableTopic { diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java index b19ed2962c..24b5888f39 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java @@ -63,6 +63,8 @@ * @author Tomaz Fernandes * @author Gary Russell * @author Adrian Chlebosz + * @author Wang Zhiyang + * * @since 2.7 * */ @@ -115,6 +117,13 @@ public RetryableTopicAnnotationProcessor(@Nullable BeanFactory beanFactory, @Nul public RetryTopicConfiguration processAnnotation(String[] topics, Method method, RetryableTopic annotation, Object bean) { + Class clazz = method.getDeclaringClass(); + return processAnnotation(topics, clazz, annotation, bean); + } + + public RetryTopicConfiguration processAnnotation(String[] topics, Class clazz, RetryableTopic annotation, + Object bean) { + Long resolvedTimeout = resolveExpressionAsLong(annotation.timeout(), "timeout", false); long timeout = RetryTopicConstants.NOT_SET; if (resolvedTimeout != null) { @@ -140,7 +149,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method, .customBackoff(createBackoffFromAnnotation(annotation.backoff(), this.beanFactory)) .retryTopicSuffix(resolveExpressionAsString(annotation.retryTopicSuffix(), "retryTopicSuffix")) .dltSuffix(resolveExpressionAsString(annotation.dltTopicSuffix(), "dltTopicSuffix")) - .dltHandlerMethod(getDltProcessor(method, bean)) + .dltHandlerMethod(getDltProcessor(clazz, bean)) .includeTopics(Arrays.asList(topics)) .listenerFactory(resolveExpressionAsString(annotation.listenerContainerFactory(), "listenerContainerFactory")) .autoCreateTopics(resolveExpressionAsBoolean(annotation.autoCreateTopics(), "autoCreateTopics"), @@ -218,9 +227,8 @@ private Map>> createDltRoutingSpecFromAnn .collect(Collectors.toMap(ExceptionBasedDltDestination::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions()))); } - private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean) { - Class declaringClass = listenerMethod.getDeclaringClass(); - return Arrays.stream(ReflectionUtils.getDeclaredMethods(declaringClass)) + private EndpointHandlerMethod getDltProcessor(Class clazz, Object bean) { + return Arrays.stream(ReflectionUtils.getDeclaredMethods(clazz)) .filter(method -> AnnotationUtils.findAnnotation(method, DltHandler.class) != null) .map(method -> RetryTopicConfigurer.createHandlerMethodWith(bean, method)) .findFirst() diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationProviderTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationProviderTests.java index 798946427f..d30e3859e2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationProviderTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationProviderTests.java @@ -25,10 +25,8 @@ import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; -import java.lang.annotation.ElementType; import java.lang.annotation.Retention; import java.lang.annotation.RetentionPolicy; -import java.lang.annotation.Target; import java.lang.reflect.Method; import java.util.Collections; @@ -96,10 +94,13 @@ void shouldProvideFromAnnotation() { // given RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory); RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, annotatedMethod, bean); + RetryTopicConfiguration configurationFromClass = provider + .findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean); // then then(this.beanFactory).should(times(0)).getBeansOfType(RetryTopicConfiguration.class); - + assertThat(configuration).isNotNull(); + assertThat(configurationFromClass).isNotNull(); } @Test @@ -113,10 +114,13 @@ void shouldProvideFromBeanFactory() { // given RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory); RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, nonAnnotatedMethod, bean); + RetryTopicConfiguration configurationFromClass = provider + .findRetryConfigurationFor(topics, null, NonAnnotatedClass.class, bean); // then - then(this.beanFactory).should(times(1)).getBeansOfType(RetryTopicConfiguration.class); + then(this.beanFactory).should(times(2)).getBeansOfType(RetryTopicConfiguration.class); assertThat(configuration).isEqualTo(retryTopicConfiguration); + assertThat(configurationFromClass).isEqualTo(retryTopicConfiguration); } @@ -131,10 +135,13 @@ void shouldFindNone() { // given RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory); RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, nonAnnotatedMethod, bean); + RetryTopicConfiguration configurationFromClass = provider + .findRetryConfigurationFor(topics, null, NonAnnotatedClass.class, bean); // then - then(this.beanFactory).should(times(1)).getBeansOfType(RetryTopicConfiguration.class); + then(this.beanFactory).should(times(2)).getBeansOfType(RetryTopicConfiguration.class); assertThat(configuration).isNull(); + assertThat(configurationFromClass).isNull(); } @@ -147,10 +154,15 @@ void shouldProvideFromMetaAnnotation() { // given RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory); RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, metaAnnotatedMethod, bean); + RetryTopicConfiguration configurationFromClass = provider + .findRetryConfigurationFor(topics, null, MetaAnnotatedClass.class, bean); // then then(this.beanFactory).should(times(0)).getBeansOfType(RetryTopicConfiguration.class); + assertThat(configuration).isNotNull(); assertThat(configuration.getConcurrency()).isEqualTo(3); + assertThat(configurationFromClass).isNotNull(); + assertThat(configurationFromClass.getConcurrency()).isEqualTo(3); } @@ -160,9 +172,12 @@ void shouldNotConfigureIfBeanFactoryNull() { // given RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(null); RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, nonAnnotatedMethod, bean); + RetryTopicConfiguration configurationFromClass + = provider.findRetryConfigurationFor(topics, null, NonAnnotatedClass.class, bean); // then assertThat(configuration).isNull(); + assertThat(configurationFromClass).isNull(); } @@ -175,7 +190,6 @@ public void nonAnnotatedMethod() { // NoOps } - @Target({ElementType.METHOD}) @Retention(RetentionPolicy.RUNTIME) @RetryableTopic @interface MetaAnnotatedRetryableTopic { @@ -187,4 +201,19 @@ public void nonAnnotatedMethod() { public void metaAnnotatedMethod() { // NoOps } + + @RetryableTopic + public static class AnnotatedClass { + // NoOps + } + + public static class NonAnnotatedClass { + // NoOps + } + + @MetaAnnotatedRetryableTopic + public static class MetaAnnotatedClass { + // NoOps + } + } diff --git a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java index e4509713ed..6ab21b0ee2 100644 --- a/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java +++ b/spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryableTopicAnnotationProcessorTests.java @@ -26,9 +26,13 @@ import java.lang.reflect.Method; import java.util.List; +import java.util.stream.Stream; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.MethodSource; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; @@ -56,7 +60,6 @@ * * @since 2.7 */ -@SuppressWarnings("deprecation") @ExtendWith(MockitoExtension.class) class RetryableTopicAnnotationProcessorTests { @@ -80,9 +83,7 @@ class RetryableTopicAnnotationProcessorTests { { this.beanFactory = mock(ConfigurableBeanFactory.class); - willAnswer(invoc -> { - return invoc.getArgument(0); - }).given(this.beanFactory).resolveEmbeddedValue(anyString()); + willAnswer(invoc -> invoc.getArgument(0)).given(this.beanFactory).resolveEmbeddedValue(anyString()); } // Retry with DLT @@ -120,9 +121,15 @@ private Object createBean() { } } + private static Stream paramsForRetryTopic() { + return Stream.of( + Arguments.of(true), + Arguments.of(false)); + } - @Test - void shouldGetDltHandlerMethod() { + @ParameterizedTest(name = "{index} shouldGetDltHandlerMethod is method {0}") + @MethodSource("paramsForRetryTopic") + void shouldGetDltHandlerMethod(boolean isMethod) { // setup given(beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class)) @@ -130,8 +137,8 @@ void shouldGetDltHandlerMethod() { RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); // given - RetryTopicConfiguration configuration = processor - .processAnnotation(topics, listenWithRetryAndDlt, annotationWithDlt, beanWithDlt); + RetryTopicConfiguration configuration = getRetryTopicConfiguration(processor, topics, listenWithRetryAndDlt, + RetryableTopicClassLevelAnnotationFactoryWithDlt.class, isMethod, annotationWithDlt, beanWithDlt); // then EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod(); @@ -142,15 +149,27 @@ void shouldGetDltHandlerMethod() { configuration.getDestinationTopicProperties().get(0)).isAlwaysRetryOnDltFailure()).isFalse(); } - @Test - void shouldGetLoggingDltHandlerMethod() { + private RetryTopicConfiguration getRetryTopicConfiguration(RetryableTopicAnnotationProcessor processor, + String[] topics, Method method, Class clazz, boolean isMethod, RetryableTopic annotation, Object bean) { + if (isMethod) { + return processor.processAnnotation(topics, method, annotation, bean); + } + else { + return processor.processAnnotation(topics, clazz, annotation, bean); + } + } + + @ParameterizedTest(name = "{index} shouldGetLoggingDltHandlerMethod is method {0}") + @MethodSource("paramsForRetryTopic") + void shouldGetLoggingDltHandlerMethod(boolean isMethod) { // setup given(beanFactory.getBean(kafkaTemplateName, KafkaOperations.class)).willReturn(kafkaOperationsFromTemplateName); RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); // given - RetryTopicConfiguration configuration = processor.processAnnotation(topics, listenWithRetry, annotation, bean); + RetryTopicConfiguration configuration = getRetryTopicConfiguration(processor, topics, listenWithRetry, + RetryableTopicClassLevelAnnotationFactory.class, isMethod, annotation, bean); // then EndpointHandlerMethod dltHandlerMethod = configuration.getDltHandlerMethod(); @@ -173,6 +192,9 @@ void shouldThrowIfProvidedKafkaTemplateNotFound() { // given - then assertThatExceptionOfType(BeanInitializationException.class) .isThrownBy(() -> processor.processAnnotation(topics, listenWithRetry, annotation, bean)); + assertThatExceptionOfType(BeanInitializationException.class) + .isThrownBy(() -> processor.processAnnotation(topics, RetryableTopicClassLevelAnnotationFactory.class, + annotation, bean)); } @Test @@ -193,10 +215,13 @@ void shouldThrowIfNoKafkaTemplateFound() { // given - then assertThatIllegalStateException().isThrownBy(() -> processor.processAnnotation(topics, listenWithRetryAndDlt, annotationWithDlt, beanWithDlt)); + assertThatIllegalStateException().isThrownBy(() -> processor.processAnnotation( + topics, RetryableTopicClassLevelAnnotationFactoryWithDlt.class, annotationWithDlt, beanWithDlt)); } - @Test - void shouldTrySpringBootDefaultKafkaTemplate() { + @ParameterizedTest(name = "{index} shouldTrySpringBootDefaultKafkaTemplate is method {0}") + @MethodSource("paramsForRetryTopic") + void shouldTrySpringBootDefaultKafkaTemplate(boolean isMethod) { // setup given(this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class)) @@ -209,15 +234,16 @@ void shouldTrySpringBootDefaultKafkaTemplate() { RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); // given - then - RetryTopicConfiguration configuration = processor.processAnnotation(topics, listenWithRetry, annotationWithDlt, - bean); + RetryTopicConfiguration configuration = getRetryTopicConfiguration(processor, topics, listenWithRetry, + RetryableTopicClassLevelAnnotationFactory.class, isMethod, annotationWithDlt, bean); DestinationTopic.Properties properties = configuration.getDestinationTopicProperties().get(0); DestinationTopic destinationTopic = new DestinationTopic("", properties); assertThat(destinationTopic.getKafkaOperations()).isEqualTo(kafkaOperationsFromDefaultName); } - @Test - void shouldGetKafkaTemplateFromBeanName() { + @ParameterizedTest(name = "{index} shouldGetKafkaTemplateFromBeanName is method {0}") + @MethodSource("paramsForRetryTopic") + void shouldGetKafkaTemplateFromBeanName(boolean isMethod) { // setup given(this.beanFactory.getBean(kafkaTemplateName, KafkaOperations.class)) @@ -225,15 +251,16 @@ void shouldGetKafkaTemplateFromBeanName() { RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); // given - then - RetryTopicConfiguration configuration = processor - .processAnnotation(topics, listenWithRetry, annotation, bean); + RetryTopicConfiguration configuration = getRetryTopicConfiguration(processor, topics, listenWithRetry, + RetryableTopicClassLevelAnnotationFactory.class, isMethod, annotation, bean); DestinationTopic.Properties properties = configuration.getDestinationTopicProperties().get(0); DestinationTopic destinationTopic = new DestinationTopic("", properties); assertThat(destinationTopic.getKafkaOperations()).isEqualTo(kafkaOperationsFromTemplateName); } - @Test - void shouldGetKafkaTemplateFromDefaultBeanName() { + @ParameterizedTest(name = "{index} shouldGetKafkaTemplateFromDefaultBeanName is method {0}") + @MethodSource("paramsForRetryTopic") + void shouldGetKafkaTemplateFromDefaultBeanName(boolean isMethod) { // setup given(this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class)) @@ -241,8 +268,8 @@ void shouldGetKafkaTemplateFromDefaultBeanName() { RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); // given - RetryTopicConfiguration configuration = processor - .processAnnotation(topics, listenWithRetryAndDlt, annotationWithDlt, beanWithDlt); + RetryTopicConfiguration configuration = getRetryTopicConfiguration(processor, topics, listenWithRetryAndDlt, + RetryableTopicClassLevelAnnotationFactoryWithDlt.class, isMethod, annotationWithDlt, beanWithDlt); // then DestinationTopic.Properties properties = configuration.getDestinationTopicProperties().get(0); @@ -250,8 +277,9 @@ void shouldGetKafkaTemplateFromDefaultBeanName() { assertThat(destinationTopic.getKafkaOperations()).isEqualTo(kafkaOperationsFromDefaultName); } - @Test - void shouldCreateExponentialBackoff() { + @ParameterizedTest(name = "{index} shouldCreateExponentialBackoff is method {0}") + @MethodSource("paramsForRetryTopic") + void shouldCreateExponentialBackoff(boolean isMethod) { // setup given(this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class)) @@ -259,8 +287,8 @@ void shouldCreateExponentialBackoff() { RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); // given - RetryTopicConfiguration configuration = processor - .processAnnotation(topics, listenWithRetryAndDlt, annotationWithDlt, beanWithDlt); + RetryTopicConfiguration configuration = getRetryTopicConfiguration(processor, topics, listenWithRetryAndDlt, + RetryableTopicClassLevelAnnotationFactoryWithDlt.class, isMethod, annotationWithDlt, beanWithDlt); // then List destinationTopicProperties = configuration.getDestinationTopicProperties(); @@ -277,8 +305,9 @@ void shouldCreateExponentialBackoff() { assertThat(destinationTopic.shouldRetryOn(1, new IllegalArgumentException())).isTrue(); } - @Test - void shouldSetAbort() { + @ParameterizedTest(name = "{index} shouldSetAbort is method {0}") + @MethodSource("paramsForRetryTopic") + void shouldSetAbort(boolean isMethod) { // setup given(this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class)) @@ -286,8 +315,8 @@ void shouldSetAbort() { RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); // given - RetryTopicConfiguration configuration = processor - .processAnnotation(topics, listenWithRetryAndDlt, annotationWithDlt, beanWithDlt); + RetryTopicConfiguration configuration = getRetryTopicConfiguration(processor, topics, listenWithRetryAndDlt, + RetryableTopicClassLevelAnnotationFactoryWithDlt.class, isMethod, annotationWithDlt, beanWithDlt); // then List destinationTopicProperties = configuration.getDestinationTopicProperties(); @@ -302,8 +331,9 @@ void shouldSetAbort() { } - @Test - void shouldCreateFixedBackoff() { + @ParameterizedTest(name = "{index} shouldCreateFixedBackoff is method {0}") + @MethodSource("paramsForRetryTopic") + void shouldCreateFixedBackoff(boolean isMethod) { // setup given(this.beanFactory.getBean(kafkaTemplateName, KafkaOperations.class)) @@ -311,8 +341,8 @@ void shouldCreateFixedBackoff() { RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); // given - RetryTopicConfiguration configuration = processor - .processAnnotation(topics, listenWithRetry, annotation, bean); + RetryTopicConfiguration configuration = getRetryTopicConfiguration(processor, topics, listenWithRetry, + RetryableTopicClassLevelAnnotationFactory.class, isMethod, annotation, bean); // then List destinationTopicProperties = configuration.getDestinationTopicProperties(); @@ -325,17 +355,18 @@ void shouldCreateFixedBackoff() { assertThat(destinationTopic3.getDestinationDelay()).isEqualTo(0); } - @Test - void shouldCreateExceptionBasedDltRoutingSpec() { + @ParameterizedTest(name = "{index} shouldCreateFixedBackoff is method {0}") + @MethodSource("paramsForRetryTopic") + void shouldCreateExceptionBasedDltRoutingSpec(boolean isMethod) { // setup given(this.beanFactory.getBean(RetryTopicBeanNames.DEFAULT_KAFKA_TEMPLATE_BEAN_NAME, KafkaOperations.class)) .willReturn(kafkaOperationsFromDefaultName); RetryableTopicAnnotationProcessor processor = new RetryableTopicAnnotationProcessor(beanFactory); // given - RetryTopicConfiguration configuration = processor - .processAnnotation( - topics, listenWithCustomDltRouting, annotationWithCustomDltRouting, beanWithCustomDltRouting); + RetryTopicConfiguration configuration = getRetryTopicConfiguration(processor, topics, + listenWithCustomDltRouting, RetryableTopicClassLevelAnnotationFactoryWithCustomDltRouting.class, + isMethod, annotationWithCustomDltRouting, beanWithCustomDltRouting); // then List destinationTopicProperties = configuration.getDestinationTopicProperties(); @@ -355,6 +386,12 @@ void listenWithRetry() { } } + @KafkaListener + @RetryableTopic(kafkaTemplate = RetryableTopicAnnotationProcessorTests.kafkaTemplateName) + static class RetryableTopicClassLevelAnnotationFactory { + + } + static class RetryableTopicAnnotationFactoryWithDlt { @KafkaListener @@ -370,6 +407,18 @@ void handleDlt() { } } + @KafkaListener + @RetryableTopic(attempts = "3", backoff = @Backoff(multiplier = 2, value = 1000), + dltStrategy = DltStrategy.FAIL_ON_ERROR, excludeNames = "java.lang.IllegalStateException") + static class RetryableTopicClassLevelAnnotationFactoryWithDlt { + + @DltHandler + void handleDlt() { + // NoOps + } + + } + static class RetryableTopicAnnotationFactoryWithCustomDltRouting { @KafkaListener @RetryableTopic( @@ -384,4 +433,17 @@ void listenWithRetry() { // NoOps } } + + @KafkaListener + @RetryableTopic( + attempts = "1", + exceptionBasedDltRouting = { + @ExceptionBasedDltDestination( + suffix = "-deserialization", exceptions = {DeserializationException.class} + ) + } + ) + static class RetryableTopicClassLevelAnnotationFactoryWithCustomDltRouting { + } + } From 5a462f758699432de420b03a78d6d2d5e16c77d1 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Sat, 9 Mar 2024 02:00:43 +0800 Subject: [PATCH 2/3] add java doc --- .../kafka/annotation/RetryTopicConfigurationProvider.java | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java index 63e603e4d9..85cdc2f40a 100644 --- a/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java +++ b/spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java @@ -106,6 +106,14 @@ public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method return findRetryConfigurationFor(topics, method, null, bean); } + /** + * Find retry topic configuration. + * @param topics the retryable topic list. + * @param method the method that gets @RetryableTopic annotation. + * @param clazz the class that gets @RetryableTopic annotation. + * @param bean the bean. + * @return the retry topic configuration. + */ @Nullable public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, @Nullable Method method, @Nullable Class clazz, Object bean) { From 9c415bd39ab9be3029bd051135aa6f86a2c43ab4 Mon Sep 17 00:00:00 2001 From: Wzy19930507 <1208931582@qq.com> Date: Sat, 9 Mar 2024 02:37:36 +0800 Subject: [PATCH 3/3] * add adoc in what-new.adoc and retry-config.adoc --- .../ROOT/pages/retrytopic/retry-config.adoc | 25 ++++++++++++++++++- .../antora/modules/ROOT/pages/whats-new.adoc | 4 +++ 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-config.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-config.adoc index 6394863aa5..a5e2b2efa7 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-config.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/retrytopic/retry-config.adoc @@ -62,7 +62,7 @@ static @interface MetaAnnotatedRetryableTopic { You can also configure the non-blocking retry support by creating `RetryTopicConfiguration` beans in a `@Configuration` annotated class. -[source, java] +[source, java] ---- @Bean public RetryTopicConfiguration myRetryTopic(KafkaTemplate template) { @@ -197,3 +197,26 @@ You can return a specific partition number, or `null` to indicate that the `Kafk By default, all values of retry headers (number of attempts, timestamps) are retained when a record transitions through the retry topics. Starting with version 2.9.6, if you want to retain just the last value of these headers, use the `configureDeadLetterPublishingContainerFactory()` method shown above to set the factory's `retainAllRetryHeaderValues` property to `false`. +[[find-retry-topic-config]] +== Find RetryTopicConfiguration +Attempts to provide an instance of `RetryTopicConfiguration` by either creating one from a `@RetryableTopic` annotation, or from the bean container if no annotation is available. + +If beans are found in the container, there's a check to determine whether the provided topics should be handled by any of such instances. + +If `@RetryableTopic` annotation is provided, a `DltHandler` annotated method is looked up. + +since 3.2, provide new API to Create `RetryTopicConfiguration` when `@RetryableTopic` annotated on a class: + +[source, java] +---- +@Bean +public RetryTopicConfiguration myRetryTopic() { + RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory); + return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean); +} + +@RetryableTopic +public static class AnnotatedClass { + // NoOps +} +---- \ No newline at end of file diff --git a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc index 743b59c640..69c729a352 100644 --- a/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc +++ b/spring-kafka-docs/src/main/antora/modules/ROOT/pages/whats-new.adoc @@ -52,6 +52,10 @@ See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Proc Change `@RetryableTopic` property `SameIntervalTopicReuseStrategy` default value to `SINGLE_TOPIC`. See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topic for maxInterval Exponential Delay]. +=== Support process @RetryableTopic on a class in RetryTopicConfigurationProvider. +Provides a new public API to find `RetryTopicConfiguration`. +See xref:retrytopic/retry-config.adoc#find-retry-topic-config[Find RetryTopicConfiguration] + [[x32-seek-offset-compute-fn]] === New API method to seek to an offset based on a user provided function `ConsumerCallback` provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument.