Skip to content

Commit abdc2e1

Browse files
garyrussellsnicoll
authored andcommitted
Auto-configure Kafka listener container with rebalance listener
This commit associates a `ConsumerAwareRebalanceListener` to the auto-configured listener container factory if a single instance is found in the context. See gh-16755
1 parent 0635d86 commit abdc2e1

File tree

3 files changed

+44
-1
lines changed

3 files changed

+44
-1
lines changed

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/ConcurrentKafkaListenerContainerFactoryConfigurer.java

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import org.springframework.kafka.core.KafkaTemplate;
2626
import org.springframework.kafka.listener.AfterRollbackProcessor;
2727
import org.springframework.kafka.listener.BatchErrorHandler;
28+
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
2829
import org.springframework.kafka.listener.ContainerProperties;
2930
import org.springframework.kafka.listener.ErrorHandler;
3031
import org.springframework.kafka.support.converter.MessageConverter;
@@ -53,6 +54,8 @@ public class ConcurrentKafkaListenerContainerFactoryConfigurer {
5354

5455
private AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
5556

57+
private ConsumerAwareRebalanceListener rebalanceListener;
58+
5659
/**
5760
* Set the {@link KafkaProperties} to use.
5861
* @param properties the properties
@@ -111,6 +114,15 @@ void setAfterRollbackProcessor(
111114
this.afterRollbackProcessor = afterRollbackProcessor;
112115
}
113116

117+
/**
118+
* Set the {@link ConsumerAwareRebalanceListener} to use.
119+
* @param rebalanceListener the rebalance listener.
120+
* @since 2.2
121+
*/
122+
void setRebalanceListener(ConsumerAwareRebalanceListener rebalanceListener) {
123+
this.rebalanceListener = rebalanceListener;
124+
}
125+
114126
/**
115127
* Configure the specified Kafka listener container factory. The factory can be
116128
* further tuned and default settings can be overridden.
@@ -160,6 +172,7 @@ private void configureContainer(ContainerProperties container) {
160172
map.from(properties::getLogContainerConfig).to(container::setLogContainerConfig);
161173
map.from(properties::isMissingTopicsFatal).to(container::setMissingTopicsFatal);
162174
map.from(this.transactionManager).to(container::setTransactionManager);
175+
map.from(this.rebalanceListener).to(container::setConsumerRebalanceListener);
163176
}
164177

165178
}

spring-boot-project/spring-boot-autoconfigure/src/main/java/org/springframework/boot/autoconfigure/kafka/KafkaAnnotationDrivenConfiguration.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@
2929
import org.springframework.kafka.core.KafkaTemplate;
3030
import org.springframework.kafka.listener.AfterRollbackProcessor;
3131
import org.springframework.kafka.listener.BatchErrorHandler;
32+
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
3233
import org.springframework.kafka.listener.ErrorHandler;
3334
import org.springframework.kafka.support.converter.BatchMessageConverter;
3435
import org.springframework.kafka.support.converter.BatchMessagingMessageConverter;
@@ -63,14 +64,17 @@ class KafkaAnnotationDrivenConfiguration {
6364

6465
private final AfterRollbackProcessor<Object, Object> afterRollbackProcessor;
6566

67+
private final ConsumerAwareRebalanceListener rebalanceListener;
68+
6669
KafkaAnnotationDrivenConfiguration(KafkaProperties properties,
6770
ObjectProvider<RecordMessageConverter> messageConverter,
6871
ObjectProvider<BatchMessageConverter> batchMessageConverter,
6972
ObjectProvider<KafkaTemplate<Object, Object>> kafkaTemplate,
7073
ObjectProvider<KafkaAwareTransactionManager<Object, Object>> kafkaTransactionManager,
7174
ObjectProvider<ErrorHandler> errorHandler,
7275
ObjectProvider<BatchErrorHandler> batchErrorHandler,
73-
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor) {
76+
ObjectProvider<AfterRollbackProcessor<Object, Object>> afterRollbackProcessor,
77+
ObjectProvider<ConsumerAwareRebalanceListener> rebalanceListener) {
7478
this.properties = properties;
7579
this.messageConverter = messageConverter.getIfUnique();
7680
this.batchMessageConverter = batchMessageConverter.getIfUnique(
@@ -80,6 +84,7 @@ class KafkaAnnotationDrivenConfiguration {
8084
this.errorHandler = errorHandler.getIfUnique();
8185
this.batchErrorHandler = batchErrorHandler.getIfUnique();
8286
this.afterRollbackProcessor = afterRollbackProcessor.getIfUnique();
87+
this.rebalanceListener = rebalanceListener.getIfUnique();
8388
}
8489

8590
@Bean
@@ -95,6 +100,7 @@ public ConcurrentKafkaListenerContainerFactoryConfigurer kafkaListenerContainerF
95100
configurer.setErrorHandler(this.errorHandler);
96101
configurer.setBatchErrorHandler(this.batchErrorHandler);
97102
configurer.setAfterRollbackProcessor(this.afterRollbackProcessor);
103+
configurer.setRebalanceListener(this.rebalanceListener);
98104
return configurer;
99105
}
100106

spring-boot-project/spring-boot-autoconfigure/src/test/java/org/springframework/boot/autoconfigure/kafka/KafkaAutoConfigurationTests.java

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.springframework.kafka.core.KafkaAdmin;
5656
import org.springframework.kafka.core.KafkaTemplate;
5757
import org.springframework.kafka.listener.AfterRollbackProcessor;
58+
import org.springframework.kafka.listener.ConsumerAwareRebalanceListener;
5859
import org.springframework.kafka.listener.ContainerProperties;
5960
import org.springframework.kafka.listener.ContainerProperties.AckMode;
6061
import org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler;
@@ -674,6 +675,18 @@ public void testConcurrentKafkaListenerContainerFactoryWithCustomAfterRollbackPr
674675
});
675676
}
676677

678+
@Test
679+
public void testConcurrentKafkaListenerContainerFactoryWithCustomRebalanceListener() {
680+
this.contextRunner.withUserConfiguration(RebalanceListenerConfiguration.class)
681+
.run((context) -> {
682+
ConcurrentKafkaListenerContainerFactory<?, ?> factory = context
683+
.getBean(ConcurrentKafkaListenerContainerFactory.class);
684+
assertThat(factory.getContainerProperties())
685+
.hasFieldOrPropertyWithValue("consumerRebalanceListener",
686+
context.getBean("rebalanceListener"));
687+
});
688+
}
689+
677690
@Test
678691
public void testConcurrentKafkaListenerContainerFactoryWithKafkaTemplate() {
679692
this.contextRunner.run((context) -> {
@@ -749,6 +762,17 @@ public AfterRollbackProcessor<Object, Object> afterRollbackProcessor() {
749762

750763
}
751764

765+
@Configuration(proxyBeanMethods = false)
766+
protected static class RebalanceListenerConfiguration {
767+
768+
@Bean
769+
public ConsumerAwareRebalanceListener rebalanceListener() {
770+
return new ConsumerAwareRebalanceListener() {
771+
};
772+
}
773+
774+
}
775+
752776
@Configuration(proxyBeanMethods = false)
753777
@EnableKafkaStreams
754778
protected static class EnableKafkaStreamsConfiguration {

0 commit comments

Comments
 (0)