Skip to content

Commit f65b65c

Browse files
committed
* @RetryableTopic support annotated on Class.
* Support process `@RetryableTopic` from Class in `RetryTopicConfigurationProvider`. * Support process `@DltHandler` when `@RetryableTopic` annotated on the Class in `RetryableTopicAnnotationProcessor`. part of #3105
1 parent e3abc0e commit f65b65c

File tree

5 files changed

+180
-68
lines changed

5 files changed

+180
-68
lines changed

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryTopicConfigurationProvider.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -16,8 +16,10 @@
1616

1717
package org.springframework.kafka.annotation;
1818

19+
import java.lang.reflect.AnnotatedElement;
1920
import java.lang.reflect.Method;
2021
import java.util.Map;
22+
import java.util.Objects;
2123

2224
import org.apache.commons.logging.LogFactory;
2325

@@ -53,6 +55,8 @@
5355
*
5456
* @author Tomaz Fernandes
5557
* @author Gary Russell
58+
* @author Wang Zhiyang
59+
*
5660
* @since 2.7
5761
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurer
5862
* @see RetryableTopic
@@ -96,19 +100,34 @@ public RetryTopicConfigurationProvider(@Nullable BeanFactory beanFactory, @Nulla
96100
this.resolver = resolver;
97101
this.expressionContext = expressionContext;
98102
}
103+
99104
@Nullable
100105
public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method method, Object bean) {
101-
RetryableTopic annotation = MergedAnnotations.from(method, SearchStrategy.TYPE_HIERARCHY,
102-
RepeatableContainers.none())
103-
.get(RetryableTopic.class)
104-
.synthesize(MergedAnnotation::isPresent)
105-
.orElse(null);
106+
return findRetryConfigurationFor(topics, method, null, bean);
107+
}
108+
109+
@Nullable
110+
public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, @Nullable Method method,
111+
@Nullable Class<?> clazz, Object bean) {
112+
113+
RetryableTopic annotation = getRetryableTopicAnnotationFromAnnotatedElement(
114+
Objects.requireNonNullElse(method, clazz));
115+
Class<?> declaringClass = method != null ? method.getDeclaringClass() : clazz;
106116
return annotation != null
107117
? new RetryableTopicAnnotationProcessor(this.beanFactory, this.resolver, this.expressionContext)
108-
.processAnnotation(topics, method, annotation, bean)
118+
.processAnnotation(topics, declaringClass, annotation, bean)
109119
: maybeGetFromContext(topics);
110120
}
111121

122+
@Nullable
123+
private RetryableTopic getRetryableTopicAnnotationFromAnnotatedElement(AnnotatedElement element) {
124+
return MergedAnnotations.from(element, SearchStrategy.TYPE_HIERARCHY,
125+
RepeatableContainers.none())
126+
.get(RetryableTopic.class)
127+
.synthesize(MergedAnnotation::isPresent)
128+
.orElse(null);
129+
}
130+
112131
@Nullable
113132
private RetryTopicConfiguration maybeGetFromContext(String[] topics) {
114133
if (this.beanFactory == null || !ListableBeanFactory.class.isAssignableFrom(this.beanFactory.getClass())) {

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopic.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,7 +47,7 @@
4747
*
4848
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurer
4949
*/
50-
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
50+
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE, ElementType.TYPE })
5151
@Retention(RetentionPolicy.RUNTIME)
5252
@Documented
5353
public @interface RetryableTopic {

spring-kafka/src/main/java/org/springframework/kafka/annotation/RetryableTopicAnnotationProcessor.java

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,8 @@
6363
* @author Tomaz Fernandes
6464
* @author Gary Russell
6565
* @author Adrian Chlebosz
66+
* @author Wang Zhiyang
67+
*
6668
* @since 2.7
6769
*
6870
*/
@@ -115,6 +117,13 @@ public RetryableTopicAnnotationProcessor(@Nullable BeanFactory beanFactory, @Nul
115117
public RetryTopicConfiguration processAnnotation(String[] topics, Method method, RetryableTopic annotation,
116118
Object bean) {
117119

120+
Class<?> clazz = method.getDeclaringClass();
121+
return processAnnotation(topics, clazz, annotation, bean);
122+
}
123+
124+
public RetryTopicConfiguration processAnnotation(String[] topics, Class<?> clazz, RetryableTopic annotation,
125+
Object bean) {
126+
118127
Long resolvedTimeout = resolveExpressionAsLong(annotation.timeout(), "timeout", false);
119128
long timeout = RetryTopicConstants.NOT_SET;
120129
if (resolvedTimeout != null) {
@@ -140,7 +149,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
140149
.customBackoff(createBackoffFromAnnotation(annotation.backoff(), this.beanFactory))
141150
.retryTopicSuffix(resolveExpressionAsString(annotation.retryTopicSuffix(), "retryTopicSuffix"))
142151
.dltSuffix(resolveExpressionAsString(annotation.dltTopicSuffix(), "dltTopicSuffix"))
143-
.dltHandlerMethod(getDltProcessor(method, bean))
152+
.dltHandlerMethod(getDltProcessor(clazz, bean))
144153
.includeTopics(Arrays.asList(topics))
145154
.listenerFactory(resolveExpressionAsString(annotation.listenerContainerFactory(), "listenerContainerFactory"))
146155
.autoCreateTopics(resolveExpressionAsBoolean(annotation.autoCreateTopics(), "autoCreateTopics"),
@@ -218,9 +227,8 @@ private Map<String, Set<Class<? extends Throwable>>> createDltRoutingSpecFromAnn
218227
.collect(Collectors.toMap(ExceptionBasedDltDestination::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions())));
219228
}
220229

221-
private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean) {
222-
Class<?> declaringClass = listenerMethod.getDeclaringClass();
223-
return Arrays.stream(ReflectionUtils.getDeclaredMethods(declaringClass))
230+
private EndpointHandlerMethod getDltProcessor(Class<?> clazz, Object bean) {
231+
return Arrays.stream(ReflectionUtils.getDeclaredMethods(clazz))
224232
.filter(method -> AnnotationUtils.findAnnotation(method, DltHandler.class) != null)
225233
.map(method -> RetryTopicConfigurer.createHandlerMethodWith(bean, method))
226234
.findFirst()

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurationProviderTests.java

Lines changed: 39 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
/*
2-
* Copyright 2018-2023 the original author or authors.
2+
* Copyright 2018-2024 the original author or authors.
33
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
@@ -25,10 +25,8 @@
2525
import static org.mockito.Mockito.mock;
2626
import static org.mockito.Mockito.times;
2727

28-
import java.lang.annotation.ElementType;
2928
import java.lang.annotation.Retention;
3029
import java.lang.annotation.RetentionPolicy;
31-
import java.lang.annotation.Target;
3230
import java.lang.reflect.Method;
3331
import java.util.Collections;
3432

@@ -47,6 +45,8 @@
4745
* @author Tomaz Fernandes
4846
* @author Gary Russell
4947
* @author Fabio da Silva Jr.
48+
* @author Wang Zhiyang
49+
*
5050
* @since 2.7
5151
*/
5252
@ExtendWith(MockitoExtension.class)
@@ -56,9 +56,7 @@ class RetryTopicConfigurationProviderTests {
5656

5757
{
5858
this.beanFactory = mock(ConfigurableListableBeanFactory.class);
59-
willAnswer(invoc -> {
60-
return invoc.getArgument(0);
61-
}).given(this.beanFactory).resolveEmbeddedValue(anyString());
59+
willAnswer(invoc -> invoc.getArgument(0)).given(this.beanFactory).resolveEmbeddedValue(anyString());
6260
}
6361

6462
private final String[] topics = {"topic1", "topic2"};
@@ -81,18 +79,12 @@ private Method getAnnotatedMethod(String methodName) {
8179
@Mock
8280
Object bean;
8381

84-
@Mock
85-
RetryableTopic annotation;
86-
8782
@Mock
8883
KafkaOperations<?, ?> kafkaOperations;
8984

9085
@Mock
9186
RetryTopicConfiguration retryTopicConfiguration;
9287

93-
@Mock
94-
RetryTopicConfiguration retryTopicConfiguration2;
95-
9688
@Test
9789
void shouldProvideFromAnnotation() {
9890

@@ -102,10 +94,13 @@ void shouldProvideFromAnnotation() {
10294
// given
10395
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
10496
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, annotatedMethod, bean);
97+
RetryTopicConfiguration configurationFromClass = provider
98+
.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
10599

106100
// then
107101
then(this.beanFactory).should(times(0)).getBeansOfType(RetryTopicConfiguration.class);
108-
102+
assertThat(configuration).isNotNull();
103+
assertThat(configurationFromClass).isNotNull();
109104
}
110105

111106
@Test
@@ -119,10 +114,13 @@ void shouldProvideFromBeanFactory() {
119114
// given
120115
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
121116
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, nonAnnotatedMethod, bean);
117+
RetryTopicConfiguration configurationFromClass = provider
118+
.findRetryConfigurationFor(topics, null, NonAnnotatedClass.class, bean);
122119

123120
// then
124-
then(this.beanFactory).should(times(1)).getBeansOfType(RetryTopicConfiguration.class);
121+
then(this.beanFactory).should(times(2)).getBeansOfType(RetryTopicConfiguration.class);
125122
assertThat(configuration).isEqualTo(retryTopicConfiguration);
123+
assertThat(configurationFromClass).isEqualTo(retryTopicConfiguration);
126124

127125
}
128126

@@ -137,10 +135,13 @@ void shouldFindNone() {
137135
// given
138136
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
139137
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, nonAnnotatedMethod, bean);
138+
RetryTopicConfiguration configurationFromClass = provider
139+
.findRetryConfigurationFor(topics, null, NonAnnotatedClass.class, bean);
140140

141141
// then
142-
then(this.beanFactory).should(times(1)).getBeansOfType(RetryTopicConfiguration.class);
142+
then(this.beanFactory).should(times(2)).getBeansOfType(RetryTopicConfiguration.class);
143143
assertThat(configuration).isNull();
144+
assertThat(configurationFromClass).isNull();
144145

145146
}
146147

@@ -153,10 +154,15 @@ void shouldProvideFromMetaAnnotation() {
153154
// given
154155
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
155156
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, metaAnnotatedMethod, bean);
157+
RetryTopicConfiguration configurationFromClass = provider
158+
.findRetryConfigurationFor(topics, null, MetaAnnotatedClass.class, bean);
156159

157160
// then
158161
then(this.beanFactory).should(times(0)).getBeansOfType(RetryTopicConfiguration.class);
162+
assertThat(configuration).isNotNull();
159163
assertThat(configuration.getConcurrency()).isEqualTo(3);
164+
assertThat(configurationFromClass).isNotNull();
165+
assertThat(configurationFromClass.getConcurrency()).isEqualTo(3);
160166

161167
}
162168

@@ -166,9 +172,12 @@ void shouldNotConfigureIfBeanFactoryNull() {
166172
// given
167173
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(null);
168174
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, nonAnnotatedMethod, bean);
175+
RetryTopicConfiguration configurationFromClass
176+
= provider.findRetryConfigurationFor(topics, null, NonAnnotatedClass.class, bean);
169177

170178
// then
171179
assertThat(configuration).isNull();
180+
assertThat(configurationFromClass).isNull();
172181

173182
}
174183

@@ -181,7 +190,6 @@ public void nonAnnotatedMethod() {
181190
// NoOps
182191
}
183192

184-
@Target({ElementType.METHOD})
185193
@Retention(RetentionPolicy.RUNTIME)
186194
@RetryableTopic
187195
@interface MetaAnnotatedRetryableTopic {
@@ -193,4 +201,19 @@ public void nonAnnotatedMethod() {
193201
public void metaAnnotatedMethod() {
194202
// NoOps
195203
}
204+
205+
@RetryableTopic
206+
public static class AnnotatedClass {
207+
// NoOps
208+
}
209+
210+
public static class NonAnnotatedClass {
211+
// NoOps
212+
}
213+
214+
@MetaAnnotatedRetryableTopic
215+
public static class MetaAnnotatedClass {
216+
// NoOps
217+
}
218+
196219
}

0 commit comments

Comments
 (0)