Skip to content

Commit bc90d48

Browse files
committed
Merge pull request #16755 from garyrussell
* pr/16755: Polish "Auto-configure Kafka listener container with rebalance listener" Auto-configure Kafka listener container with rebalance listener
2 parents 0635d86 + 74208bb commit bc90d48

File tree

4 files changed

+45
-2
lines changed

4 files changed

+45
-2
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.kafka.core.KafkaTemplate;
2626
import org.springframework.kafka.listener.AfterRollbackProcessor;
2727
import org.springframework.kafka.listener.BatchErrorHandler;
28+
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
2829
import org.springframework.kafka.listener.ContainerProperties;
2930
import org.springframework.kafka.listener.ErrorHandler;
3031
import org.springframework.kafka.support.converter.MessageConverter;
@@ -47,6 +48,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
4748

4849
private KafkaAwareTransactionManager<Object, Object> transactionManager;
4950

51+
private ConsumerAwareRebalanceListener rebalanceListener;
52+
5053
private ErrorHandler errorHandler;
5154

5255
private BatchErrorHandler batchErrorHandler;
@@ -86,6 +89,15 @@ void setTransactionManager(
8689
this.transactionManager = transactionManager;
8790
}
8891

92+
/**
93+
* Set the {@link ConsumerAwareRebalanceListener} to use.
94+
* @param rebalanceListener the rebalance listener.
95+
* @since 2.2
96+
*/
97+
void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) {
98+
this.rebalanceListener = rebalanceListener;
99+
}
100+
89101
/**
90102
* Set the {@link ErrorHandler} to use.
91103
* @param errorHandler the error handler
@@ -160,6 +172,7 @@ private void configureContainer(ContainerProperties container) {
160172
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
161173
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
162174
map.from(this.transactionManager).to(container::setTransactionManager);
175+
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
163176
}
164177

165178
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.kafka.core.KafkaTemplate;
3030
import org.springframework.kafka.listener.AfterRollbackProcessor;
3131
import org.springframework.kafka.listener.BatchErrorHandler;
32+
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
3233
import org.springframework.kafka.listener.ErrorHandler;
3334
import org.springframework.kafka.support.converter.BatchMessageConverter;
3435
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
@@ -57,6 +58,8 @@ class KafkaAnnotationDrivenConfiguration {
5758

5859
private final KafkaAwareTransactionManager<Object, Object> transactionManager;
5960

61+
private final ConsumerAwareRebalanceListener rebalanceListener;
62+
6063
private final ErrorHandler errorHandler;
6164

6265
private final BatchErrorHandler batchErrorHandler;
@@ -68,6 +71,7 @@ class KafkaAnnotationDrivenConfiguration {
6871
ObjectProvider<BatchMessageConverter> batchMessageConverter,
6972
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
7073
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
74+
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener,
7175
ObjectProvider<ErrorHandler> errorHandler,
7276
ObjectProvider<BatchErrorHandler> batchErrorHandler,
7377
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
@@ -77,6 +81,7 @@ class KafkaAnnotationDrivenConfiguration {
7781
() -> new BatchMessagingMessageConverter(this.messageConverter));
7882
this.kafkaTemplate = kafkaTemplate.getIfUnique();
7983
this.transactionManager = kafkaTransactionManager.getIfUnique();
84+
this.rebalanceListener = rebalanceListener.getIfUnique();
8085
this.errorHandler = errorHandler.getIfUnique();
8186
this.batchErrorHandler = batchErrorHandler.getIfUnique();
8287
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
@@ -92,6 +97,7 @@ public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerF
9297
configurer.setMessageConverter(messageConverterToUse);
9398
configurer.setReplyTemplate(this.kafkaTemplate);
9499
configurer.setTransactionManager(this.transactionManager);
100+
configurer.setRebalanceListener(this.rebalanceListener);
95101
configurer.setErrorHandler(this.errorHandler);
96102
configurer.setBatchErrorHandler(this.batchErrorHandler);
97103
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.springframework.kafka.core.KafkaAdmin;
5656
import org.springframework.kafka.core.KafkaTemplate;
5757
import org.springframework.kafka.listener.AfterRollbackProcessor;
58+
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
5859
import org.springframework.kafka.listener.ContainerProperties;
5960
import org.springframework.kafka.listener.ContainerProperties.AckMode;
6061
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
@@ -674,6 +675,18 @@ public void testConcurrentKafkaListenerContainerFactoryWithCustomAfterRollbackPr
674675
});
675676
}
676677

678+
@Test
679+
public void testConcurrentKafkaListenerContainerFactoryWithCustomRebalanceListener() {
680+
this.contextRunner.withUserConfiguration(RebalanceListenerConfiguration.class)
681+
.run((context) -> {
682+
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
683+
.getBean(ConcurrentKafkaListenerContainerFactory.class);
684+
assertThat(factory.getContainerProperties())
685+
.hasFieldOrPropertyWithValue("consumerRebalanceListener",
686+
context.getBean("rebalanceListener"));
687+
});
688+
}
689+
677690
@Test
678691
public void testConcurrentKafkaListenerContainerFactoryWithKafkaTemplate() {
679692
this.contextRunner.run((context) -> {
@@ -749,6 +762,16 @@ public AfterRollbackProcessor<Object, Object> afterRollbackProcessor() {
749762

750763
}
751764

765+
@Configuration(proxyBeanMethods = false)
766+
protected static class RebalanceListenerConfiguration {
767+
768+
@Bean
769+
public ConsumerAwareRebalanceListener rebalanceListener() {
770+
return mock(ConsumerAwareRebalanceListener.class);
771+
}
772+
773+
}
774+
752775
@Configuration(proxyBeanMethods = false)
753776
@EnableKafkaStreams
754777
protected static class EnableKafkaStreamsConfiguration {

spring-boot-project/spring-boot-docs/src/main/asciidoc/spring-boot-features.adoc

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6158,8 +6158,9 @@ The following component creates a listener endpoint on the `someTopic` topic:
61586158
----
61596159

61606160
If a `KafkaTransactionManager` bean is defined, it is automatically associated to the
6161-
container factory. Similarly, if a `ErrorHandler` or `AfterRollbackProcessor` bean is
6162-
defined, it is automatically associated to the default factory.
6161+
container factory. Similarly, if a `ErrorHandler`, `AfterRollbackProcessor` or
6162+
`ConsumerAwareRebalanceListener` bean is defined, it is automatically associated to the
6163+
default factory.
61636164

61646165
Depending on the listener type, a `RecordMessageConverter` or `BatchMessageConverter` bean
61656166
is associated to the default factory. If only a `RecordMessageConverter` bean is present

0 commit comments

Comments
 (0)