Skip to content

Commit a85975d

Browse files
Address review comments
Make EndpointCustomizerFactory ctor public
1 parent a6a2399 commit a85975d

File tree

4 files changed

+11
-19
lines changed

4 files changed

+11
-19
lines changed

spring-kafka-docs/src/main/asciidoc/retrytopic.adoc

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -148,9 +148,8 @@ public KafkaTemplate<String, Object> kafkaTemplate() {
148148
----
149149
====
150150

151-
IMPORTANT: You can use multiple `@KafkaListener` annotations for the same topic with or without manual partition assignment along with non-blocking retries, but mind that only one configuration will be used for a given topic.
152-
It's best to use a single `RetryTopicConfiguration` bean for configuration of such topics;
153-
if you're using multiple `@RetryableTopic` annotations for the same topic, make sure all of them have the same values, otherwise one of them will be applied to all of that topic's listeners and the other annotations' values will be ignored.
151+
IMPORTANT: Multiple `@KafkaListener` annotations can be used for the same topic with or without manual partition assignment along with non-blocking retries, but only one configuration will be used for a given topic.
152+
It's best to use a single `RetryTopicConfiguration` bean for configuration of such topics; if multiple `@RetryableTopic` annotations are being used for the same topic, all of them should have the same values, otherwise one of them will be applied to all of that topic's listeners and the other annotations' values will be ignored.
154153

155154
==== Features
156155

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/DestinationTopic.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -220,9 +220,10 @@ public long delay() {
220220
}
221221

222222
/**
223-
* Returns the number of partitions the
223+
* Return the number of partitions the
224224
* retry topics should be created with.
225225
* @return the number of partitions.
226+
* @since 2.7.13
226227
*/
227228
public int numPartitions() {
228229
return this.numPartitions;

spring-kafka/src/main/java/org/springframework/kafka/retrytopic/EndpointCustomizerFactory.java

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@ public class EndpointCustomizerFactory {
5353

5454
private final RetryTopicNamesProviderFactory retryTopicNamesProviderFactory;
5555

56-
EndpointCustomizerFactory(DestinationTopic.Properties destinationProperties, EndpointHandlerMethod beanMethod,
56+
public EndpointCustomizerFactory(DestinationTopic.Properties destinationProperties, EndpointHandlerMethod beanMethod,
5757
BeanFactory beanFactory, RetryTopicNamesProviderFactory retryTopicNamesProviderFactory) {
5858

5959
this.destinationProperties = destinationProperties;
@@ -71,7 +71,7 @@ protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties pr
7171
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider =
7272
this.retryTopicNamesProviderFactory.createRetryTopicNamesProvider(properties);
7373
return endpoint -> {
74-
Collection<EndpointCustomizer.TopicNamesHolder> topics = getTopicNames(namesProvider, endpoint);
74+
Collection<EndpointCustomizer.TopicNamesHolder> topics = customizeAndRegisterTopics(namesProvider, endpoint);
7575
endpoint.setId(namesProvider.getEndpointId(endpoint));
7676
endpoint.setGroupId(namesProvider.getGroupId(endpoint));
7777
if (endpoint.getTopics().isEmpty() && endpoint.getTopicPartitionsToAssign() != null) {
@@ -92,7 +92,7 @@ protected EndpointCustomizer addSuffixesAndMethod(DestinationTopic.Properties pr
9292
};
9393
}
9494

95-
private TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties,
95+
private static TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties properties,
9696
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
9797
TopicPartitionOffset[] topicPartitionOffsets) {
9898
return Stream.of(topicPartitionOffsets)
@@ -102,7 +102,7 @@ private TopicPartitionOffset[] getTopicPartitions(DestinationTopic.Properties pr
102102
.toArray(TopicPartitionOffset[]::new);
103103
}
104104

105-
protected Collection<EndpointCustomizer.TopicNamesHolder> getTopicNames(
105+
protected Collection<EndpointCustomizer.TopicNamesHolder> customizeAndRegisterTopics(
106106
RetryTopicNamesProviderFactory.RetryTopicNamesProvider namesProvider,
107107
MethodKafkaListenerEndpoint<?, ?> endpoint) {
108108

spring-kafka/src/test/java/org/springframework/kafka/retrytopic/RetryTopicConfigurerTests.java

Lines changed: 3 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -46,15 +46,14 @@
4646
import org.springframework.beans.factory.NoSuchBeanDefinitionException;
4747
import org.springframework.beans.factory.support.DefaultListableBeanFactory;
4848
import org.springframework.beans.factory.support.RootBeanDefinition;
49-
import org.springframework.core.log.LogAccessor;
5049
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
5150
import org.springframework.kafka.config.KafkaListenerContainerFactory;
5251
import org.springframework.kafka.config.KafkaListenerEndpointRegistrar;
5352
import org.springframework.kafka.config.MethodKafkaListenerEndpoint;
5453
import org.springframework.kafka.config.MultiMethodKafkaListenerEndpoint;
5554
import org.springframework.kafka.support.EndpointHandlerMethod;
55+
import org.springframework.kafka.test.condition.LogLevels;
5656
import org.springframework.test.util.ReflectionTestUtils;
57-
import org.springframework.util.Assert;
5857

5958
/**
6059
* @author Tomaz Fernandes
@@ -358,21 +357,14 @@ void shouldInstantiateIfNotInContainer() {
358357

359358
}
360359

360+
@LogLevels(classes = RetryTopicConfigurer.class, level = "info")
361361
@Test
362362
@SuppressWarnings("deprecation")
363363
void shouldLogConsumerRecordMessage() {
364364
RetryTopicConfigurer.LoggingDltListenerHandlerMethod method =
365365
new RetryTopicConfigurer.LoggingDltListenerHandlerMethod();
366366
method.logMessage(consumerRecordMessage);
367-
LogAccessor logger = (LogAccessor) ReflectionTestUtils
368-
.getField(RetryTopicConfigurer.class, "LOGGER");
369-
Assert.notNull(logger, "No LOGGER found in class " + RetryTopicConfigurer.class.getSimpleName());
370-
if (logger.isInfoEnabled()) {
371-
then(consumerRecordMessage).should().topic();
372-
}
373-
else {
374-
then(consumerRecordMessage).shouldHaveNoInteractions();
375-
}
367+
then(consumerRecordMessage).should().topic();
376368
}
377369

378370
@Test

0 commit comments

Comments
 (0)