Skip to content

@RetryableTopic support @KafkaListener annotated on class part 1 #3107

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
Mar 8, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ static @interface MetaAnnotatedRetryableTopic {

You can also configure the non-blocking retry support by creating `RetryTopicConfiguration` beans in a `@Configuration` annotated class.

[source, java]
[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic(KafkaTemplate<String, Object> template) {
Expand Down Expand Up @@ -197,3 +197,26 @@ You can return a specific partition number, or `null` to indicate that the `Kafk
By default, all values of retry headers (number of attempts, timestamps) are retained when a record transitions through the retry topics.
Starting with version 2.9.6, if you want to retain just the last value of these headers, use the `configureDeadLetterPublishingContainerFactory()` method shown above to set the factory's `retainAllRetryHeaderValues` property to `false`.

[[find-retry-topic-config]]
== Find RetryTopicConfiguration
Attempts to provide an instance of `RetryTopicConfiguration` by either creating one from a `@RetryableTopic` annotation, or from the bean container if no annotation is available.

If beans are found in the container, there's a check to determine whether the provided topics should be handled by any of such instances.

If `@RetryableTopic` annotation is provided, a `DltHandler` annotated method is looked up.

since 3.2, provide new API to Create `RetryTopicConfiguration` when `@RetryableTopic` annotated on a class:

[source, java]
----
@Bean
public RetryTopicConfiguration myRetryTopic() {
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
return provider.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);
}

@RetryableTopic
public static class AnnotatedClass {
// NoOps
}
----
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ See xref:kafka/annotation-error-handling.adoc#after-rollback[After-rollback Proc
Change `@RetryableTopic` property `SameIntervalTopicReuseStrategy` default value to `SINGLE_TOPIC`.
See xref:retrytopic/topic-naming.adoc#single-topic-maxinterval-delay[Single Topic for maxInterval Exponential Delay].

=== Support process @RetryableTopic on a class in RetryTopicConfigurationProvider.
Provides a new public API to find `RetryTopicConfiguration`.
See xref:retrytopic/retry-config.adoc#find-retry-topic-config[Find RetryTopicConfiguration]

[[x32-seek-offset-compute-fn]]
=== New API method to seek to an offset based on a user provided function
`ConsumerCallback` provides a new API to seek to an offset based on a user-defined function, which takes the current offset in the consumer as an argument.
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2018-2023 the original author or authors.
* Copyright 2018-2024 the original author or authors.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -16,8 +16,10 @@

package org.springframework.kafka.annotation;

import java.lang.reflect.AnnotatedElement;
import java.lang.reflect.Method;
import java.util.Map;
import java.util.Objects;

import org.apache.commons.logging.LogFactory;

Expand Down Expand Up @@ -53,6 +55,8 @@
*
* @author Tomaz Fernandes
* @author Gary Russell
* @author Wang Zhiyang
*
* @since 2.7
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurer
* @see RetryableTopic
Expand Down Expand Up @@ -96,19 +100,42 @@ public RetryTopicConfigurationProvider(@Nullable BeanFactory beanFactory, @Nulla
this.resolver = resolver;
this.expressionContext = expressionContext;
}

@Nullable
public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, Method method, Object bean) {
RetryableTopic annotation = MergedAnnotations.from(method, SearchStrategy.TYPE_HIERARCHY,
RepeatableContainers.none())
.get(RetryableTopic.class)
.synthesize(MergedAnnotation::isPresent)
.orElse(null);
return findRetryConfigurationFor(topics, method, null, bean);
}

/**
* Find retry topic configuration.
* @param topics the retryable topic list.
* @param method the method that gets @RetryableTopic annotation.
* @param clazz the class that gets @RetryableTopic annotation.
* @param bean the bean.
* @return the retry topic configuration.
*/
@Nullable
public RetryTopicConfiguration findRetryConfigurationFor(String[] topics, @Nullable Method method,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why does it need to be public? If it's valid, add the since tag and a note in whats-new as this is a new public API.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fix it.

@Nullable Class<?> clazz, Object bean) {

RetryableTopic annotation = getRetryableTopicAnnotationFromAnnotatedElement(
Objects.requireNonNullElse(method, clazz));
Class<?> declaringClass = method != null ? method.getDeclaringClass() : clazz;
return annotation != null
? new RetryableTopicAnnotationProcessor(this.beanFactory, this.resolver, this.expressionContext)
.processAnnotation(topics, method, annotation, bean)
.processAnnotation(topics, declaringClass, annotation, bean)
: maybeGetFromContext(topics);
}

@Nullable
private RetryableTopic getRetryableTopicAnnotationFromAnnotatedElement(AnnotatedElement element) {
return MergedAnnotations.from(element, SearchStrategy.TYPE_HIERARCHY,
RepeatableContainers.none())
.get(RetryableTopic.class)
.synthesize(MergedAnnotation::isPresent)
.orElse(null);
}

@Nullable
private RetryTopicConfiguration maybeGetFromContext(String[] topics) {
if (this.beanFactory == null || !ListableBeanFactory.class.isAssignableFrom(this.beanFactory.getClass())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@
*
* @see org.springframework.kafka.retrytopic.RetryTopicConfigurer
*/
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Target({ ElementType.METHOD, ElementType.ANNOTATION_TYPE, ElementType.TYPE })
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RetryableTopic {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,8 @@
* @author Tomaz Fernandes
* @author Gary Russell
* @author Adrian Chlebosz
* @author Wang Zhiyang
*
* @since 2.7
*
*/
Expand Down Expand Up @@ -115,6 +117,13 @@ public RetryableTopicAnnotationProcessor(@Nullable BeanFactory beanFactory, @Nul
public RetryTopicConfiguration processAnnotation(String[] topics, Method method, RetryableTopic annotation,
Object bean) {

Class<?> clazz = method.getDeclaringClass();
return processAnnotation(topics, clazz, annotation, bean);
}

public RetryTopicConfiguration processAnnotation(String[] topics, Class<?> clazz, RetryableTopic annotation,
Object bean) {

Long resolvedTimeout = resolveExpressionAsLong(annotation.timeout(), "timeout", false);
long timeout = RetryTopicConstants.NOT_SET;
if (resolvedTimeout != null) {
Expand All @@ -140,7 +149,7 @@ public RetryTopicConfiguration processAnnotation(String[] topics, Method method,
.customBackoff(createBackoffFromAnnotation(annotation.backoff(), this.beanFactory))
.retryTopicSuffix(resolveExpressionAsString(annotation.retryTopicSuffix(), "retryTopicSuffix"))
.dltSuffix(resolveExpressionAsString(annotation.dltTopicSuffix(), "dltTopicSuffix"))
.dltHandlerMethod(getDltProcessor(method, bean))
.dltHandlerMethod(getDltProcessor(clazz, bean))
.includeTopics(Arrays.asList(topics))
.listenerFactory(resolveExpressionAsString(annotation.listenerContainerFactory(), "listenerContainerFactory"))
.autoCreateTopics(resolveExpressionAsBoolean(annotation.autoCreateTopics(), "autoCreateTopics"),
Expand Down Expand Up @@ -218,9 +227,8 @@ private Map<String, Set<Class<? extends Throwable>>> createDltRoutingSpecFromAnn
.collect(Collectors.toMap(ExceptionBasedDltDestination::suffix, excBasedDestDlt -> Set.of(excBasedDestDlt.exceptions())));
}

private EndpointHandlerMethod getDltProcessor(Method listenerMethod, Object bean) {
Class<?> declaringClass = listenerMethod.getDeclaringClass();
return Arrays.stream(ReflectionUtils.getDeclaredMethods(declaringClass))
private EndpointHandlerMethod getDltProcessor(Class<?> clazz, Object bean) {
return Arrays.stream(ReflectionUtils.getDeclaredMethods(clazz))
.filter(method -> AnnotationUtils.findAnnotation(method, DltHandler.class) != null)
.map(method -> RetryTopicConfigurer.createHandlerMethodWith(bean, method))
.findFirst()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;

import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
import java.lang.reflect.Method;
import java.util.Collections;

Expand Down Expand Up @@ -96,10 +94,13 @@ void shouldProvideFromAnnotation() {
// given
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, annotatedMethod, bean);
RetryTopicConfiguration configurationFromClass = provider
.findRetryConfigurationFor(topics, null, AnnotatedClass.class, bean);

// then
then(this.beanFactory).should(times(0)).getBeansOfType(RetryTopicConfiguration.class);

assertThat(configuration).isNotNull();
assertThat(configurationFromClass).isNotNull();
}

@Test
Expand All @@ -113,10 +114,13 @@ void shouldProvideFromBeanFactory() {
// given
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, nonAnnotatedMethod, bean);
RetryTopicConfiguration configurationFromClass = provider
.findRetryConfigurationFor(topics, null, NonAnnotatedClass.class, bean);

// then
then(this.beanFactory).should(times(1)).getBeansOfType(RetryTopicConfiguration.class);
then(this.beanFactory).should(times(2)).getBeansOfType(RetryTopicConfiguration.class);
assertThat(configuration).isEqualTo(retryTopicConfiguration);
assertThat(configurationFromClass).isEqualTo(retryTopicConfiguration);

}

Expand All @@ -131,10 +135,13 @@ void shouldFindNone() {
// given
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, nonAnnotatedMethod, bean);
RetryTopicConfiguration configurationFromClass = provider
.findRetryConfigurationFor(topics, null, NonAnnotatedClass.class, bean);

// then
then(this.beanFactory).should(times(1)).getBeansOfType(RetryTopicConfiguration.class);
then(this.beanFactory).should(times(2)).getBeansOfType(RetryTopicConfiguration.class);
assertThat(configuration).isNull();
assertThat(configurationFromClass).isNull();

}

Expand All @@ -147,10 +154,15 @@ void shouldProvideFromMetaAnnotation() {
// given
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(beanFactory);
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, metaAnnotatedMethod, bean);
RetryTopicConfiguration configurationFromClass = provider
.findRetryConfigurationFor(topics, null, MetaAnnotatedClass.class, bean);

// then
then(this.beanFactory).should(times(0)).getBeansOfType(RetryTopicConfiguration.class);
assertThat(configuration).isNotNull();
assertThat(configuration.getConcurrency()).isEqualTo(3);
assertThat(configurationFromClass).isNotNull();
assertThat(configurationFromClass.getConcurrency()).isEqualTo(3);

}

Expand All @@ -160,9 +172,12 @@ void shouldNotConfigureIfBeanFactoryNull() {
// given
RetryTopicConfigurationProvider provider = new RetryTopicConfigurationProvider(null);
RetryTopicConfiguration configuration = provider.findRetryConfigurationFor(topics, nonAnnotatedMethod, bean);
RetryTopicConfiguration configurationFromClass
= provider.findRetryConfigurationFor(topics, null, NonAnnotatedClass.class, bean);

// then
assertThat(configuration).isNull();
assertThat(configurationFromClass).isNull();

}

Expand All @@ -175,7 +190,6 @@ public void nonAnnotatedMethod() {
// NoOps
}

@Target({ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@RetryableTopic
@interface MetaAnnotatedRetryableTopic {
Expand All @@ -187,4 +201,19 @@ public void nonAnnotatedMethod() {
public void metaAnnotatedMethod() {
// NoOps
}

@RetryableTopic
public static class AnnotatedClass {
// NoOps
}

public static class NonAnnotatedClass {
// NoOps
}

@MetaAnnotatedRetryableTopic
public static class MetaAnnotatedClass {
// NoOps
}

}
Loading