diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java index 910f7b4e6..df9d2d366 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java @@ -5,7 +5,7 @@ import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfKafkaController; import io.github.stavshamir.springwolf.configuration.properties.SpringwolfKafkaConfigProperties; import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer; -import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFactory; +import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFromProperties; import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean; import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; import org.springframework.context.annotation.Bean; @@ -30,14 +30,15 @@ public SpringwolfKafkaController springwolfKafkaController( @Bean @ConditionalOnMissingBean - public SpringwolfKafkaProducer springwolfKafkaProducer(SpringwolfKafkaTemplateFactory producerTemplateFactory) { - return new SpringwolfKafkaProducer(producerTemplateFactory.buildKafkaTemplate()); + public SpringwolfKafkaProducer springwolfKafkaProducer( + SpringwolfKafkaTemplateFromProperties springwolfKafkaTemplateFromProperties) { + return new SpringwolfKafkaProducer(springwolfKafkaTemplateFromProperties); } @Bean @ConditionalOnMissingBean - public SpringwolfKafkaTemplateFactory springwolfKafkaTemplateFactory( + public SpringwolfKafkaTemplateFromProperties springwolfKafkaTemplateFromProperties( SpringwolfKafkaConfigProperties springwolfKafkaConfigProperties) { - return new SpringwolfKafkaTemplateFactory(springwolfKafkaConfigProperties); + return new SpringwolfKafkaTemplateFromProperties(springwolfKafkaConfigProperties); } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java index ac6299387..db7323505 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java @@ -20,13 +20,14 @@ @RequiredArgsConstructor public class SpringwolfKafkaProducer { - private final Optional> kafkaTemplate; + private final SpringwolfKafkaTemplateProvider kafkaTemplateProvider; public boolean isEnabled() { - return kafkaTemplate.isPresent(); + return kafkaTemplateProvider.isPresent(); } public void send(String topic, String key, Map headers, Object payload) { + Optional> kafkaTemplate = kafkaTemplateProvider.get(topic); if (kafkaTemplate.isPresent()) { kafkaTemplate .get() @@ -34,7 +35,7 @@ public void send(String topic, String key, Map headers, Object p .toCompletableFuture() .join(); } else { - log.warn("Kafka producer is not configured"); + log.warn("Kafka producer for topic %s is not configured".formatted(topic)); } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateFactory.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateFromProperties.java similarity index 72% rename from springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateFactory.java rename to springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateFromProperties.java index 98224d5af..e02e9a1fe 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateFactory.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateFromProperties.java @@ -14,13 +14,11 @@ @RequiredArgsConstructor @ConditionalOnBean(value = SpringwolfKafkaProducerConfiguration.class) -public class SpringwolfKafkaTemplateFactory { +public class SpringwolfKafkaTemplateFromProperties implements SpringwolfKafkaTemplateProvider { - private final SpringwolfKafkaConfigProperties springWolfKafkaConfigProperties; - - public Optional> buildKafkaTemplate() { - Optional> kafkaTemplate = Optional.empty(); + private final Optional> kafkaTemplate; + public SpringwolfKafkaTemplateFromProperties(SpringwolfKafkaConfigProperties springWolfKafkaConfigProperties) { if (springWolfKafkaConfigProperties.getPublishing() != null && springWolfKafkaConfigProperties.getPublishing().getProducer() != null) { Map producerProperties = springWolfKafkaConfigProperties @@ -30,8 +28,18 @@ public Optional> buildKafkaTemplate() { DefaultKafkaProducerFactory producerFactory = new DefaultKafkaProducerFactory<>(producerProperties); kafkaTemplate = Optional.of(new KafkaTemplate<>(producerFactory)); + } else { + kafkaTemplate = Optional.empty(); } + } + + @Override + public boolean isPresent() { + return kafkaTemplate.isPresent(); + } + @Override + public Optional> get(String topic) { return kafkaTemplate; } } diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateProvider.java b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateProvider.java new file mode 100644 index 000000000..a6dbf624a --- /dev/null +++ b/springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateProvider.java @@ -0,0 +1,20 @@ +// SPDX-License-Identifier: Apache-2.0 +package io.github.stavshamir.springwolf.producer; + +import org.springframework.kafka.core.KafkaTemplate; + +import java.util.Optional; + +public interface SpringwolfKafkaTemplateProvider { + /** + * Check if publishing in general is possible + * + * @return true if at least in one case a kafka template is present + */ + boolean isPresent(); + + /** + * Returns the matching kafka template for the topic + */ + Optional> get(String topic); +} diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java index 80c7754d3..97f60f7c8 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java @@ -22,6 +22,7 @@ import java.util.concurrent.CompletableFuture; import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.ArgumentMatchers.any; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -32,19 +33,23 @@ class SpringwolfKafkaProducerTest { private SpringwolfKafkaProducer springwolfKafkaProducer; @Mock - KafkaTemplate kafkaTemplate; + private SpringwolfKafkaTemplateProvider kafkaTemplateProvider; + + @Mock + private KafkaTemplate kafkaTemplate; @Captor private ArgumentCaptor> recordArgumentCaptor; @BeforeEach void setUp() { - springwolfKafkaProducer = new SpringwolfKafkaProducer(Optional.of(kafkaTemplate)); + springwolfKafkaProducer = new SpringwolfKafkaProducer(kafkaTemplateProvider); } @Test void testSpringwolfKafkaProducerIsNotEnabledWhenThereIsNoKafkaTemplateConfigured() { - Optional> kafkaTemplateMock = Optional.empty(); + SpringwolfKafkaTemplateProvider kafkaTemplateMock = mock(); + when(kafkaTemplateMock.isPresent()).thenReturn(false); springwolfKafkaProducer = new SpringwolfKafkaProducer(kafkaTemplateMock); @@ -54,6 +59,9 @@ void testSpringwolfKafkaProducerIsNotEnabledWhenThereIsNoKafkaTemplateConfigured @Test @SuppressWarnings("unchecked") void testSendingKafkaMessageWithoutHeaders() { + // given + when(kafkaTemplateProvider.get(any())).thenReturn(Optional.of(kafkaTemplate)); + CompletableFuture> future = new CompletableFuture<>(); when(kafkaTemplate.send(ArgumentMatchers.>any())) .thenReturn(future); @@ -61,8 +69,10 @@ void testSendingKafkaMessageWithoutHeaders() { Map payload = Collections.singletonMap("some", "field"); + // when springwolfKafkaProducer.send("test-topic", null, null, payload); + // then verify(kafkaTemplate).send(recordArgumentCaptor.capture()); ProducerRecord capturedRecord = recordArgumentCaptor.getValue(); @@ -78,6 +88,9 @@ void testSendingKafkaMessageWithoutHeaders() { @Test @SuppressWarnings("unchecked") void testSendingKafkaMessageWithHeaders() { + // given + when(kafkaTemplateProvider.get(any())).thenReturn(Optional.of(kafkaTemplate)); + CompletableFuture> future = new CompletableFuture<>(); when(kafkaTemplate.send(ArgumentMatchers.>any())) .thenReturn(future); @@ -86,8 +99,10 @@ void testSendingKafkaMessageWithHeaders() { Map payload = Collections.singletonMap("some", "field"); Map headers = Collections.singletonMap("header-key", "header"); + // when springwolfKafkaProducer.send("test-topic", null, headers, payload); + // then verify(kafkaTemplate).send(recordArgumentCaptor.capture()); ProducerRecord capturedRecord = recordArgumentCaptor.getValue(); diff --git a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateFactoryTest.java b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateFromPropertiesTest.java similarity index 59% rename from springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateFactoryTest.java rename to springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateFromPropertiesTest.java index 374636681..47678c009 100644 --- a/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateFactoryTest.java +++ b/springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaTemplateFromPropertiesTest.java @@ -5,40 +5,45 @@ import org.junit.jupiter.api.Test; import org.springframework.boot.autoconfigure.kafka.KafkaProperties; import org.springframework.boot.ssl.DefaultSslBundleRegistry; -import org.springframework.kafka.core.KafkaTemplate; import java.util.Map; -import java.util.Optional; import static org.assertj.core.api.Assertions.assertThat; -class SpringwolfKafkaTemplateFactoryTest { +class SpringwolfKafkaTemplateFromPropertiesTest { @Test void kafkaTemplateShouldNotBeCreatedForEmptyProperties() { + // given SpringwolfKafkaConfigProperties configProperties = new SpringwolfKafkaConfigProperties(); - SpringwolfKafkaTemplateFactory templateFactory = new SpringwolfKafkaTemplateFactory(configProperties); + // when + SpringwolfKafkaTemplateFromProperties kafkaTemplateProvider = + new SpringwolfKafkaTemplateFromProperties(configProperties); - Optional> kafkaTemplate = templateFactory.buildKafkaTemplate(); - - assertThat(kafkaTemplate).isNotPresent(); + // then + assertThat(kafkaTemplateProvider.isPresent()).isFalse(); + assertThat(kafkaTemplateProvider.get("topic")).isNotPresent(); } @Test void kafkaTemplateShouldNotBeCreatedForEmptyProducerConfiguration() { + // given SpringwolfKafkaConfigProperties configProperties = new SpringwolfKafkaConfigProperties(); configProperties.setPublishing(new SpringwolfKafkaConfigProperties.Publishing()); - SpringwolfKafkaTemplateFactory templateFactory = new SpringwolfKafkaTemplateFactory(configProperties); - - Optional> kafkaTemplate = templateFactory.buildKafkaTemplate(); + // when + SpringwolfKafkaTemplateFromProperties kafkaTemplateProvider = + new SpringwolfKafkaTemplateFromProperties(configProperties); - assertThat(kafkaTemplate).isNotPresent(); + // then + assertThat(kafkaTemplateProvider.isPresent()).isFalse(); + assertThat(kafkaTemplateProvider.get("topic")).isNotPresent(); } @Test void kafkaTemplateShouldBeCreatedWithProducerConfiguration() { + // given SpringwolfKafkaConfigProperties configProperties = new SpringwolfKafkaConfigProperties(); SpringwolfKafkaConfigProperties.Publishing publishing = new SpringwolfKafkaConfigProperties.Publishing(); publishing.setEnabled(true); @@ -46,14 +51,15 @@ void kafkaTemplateShouldBeCreatedWithProducerConfiguration() { configProperties.setPublishing(publishing); - SpringwolfKafkaTemplateFactory templateFactory = new SpringwolfKafkaTemplateFactory(configProperties); + // when + SpringwolfKafkaTemplateFromProperties kafkaTemplateProvider = + new SpringwolfKafkaTemplateFromProperties(configProperties); - Optional> kafkaTemplate = templateFactory.buildKafkaTemplate(); + // then + assertThat(kafkaTemplateProvider.isPresent()).isTrue(); - assertThat(kafkaTemplate).isPresent(); Map configurationProperties = - kafkaTemplate.get().getProducerFactory().getConfigurationProperties(); - + kafkaTemplateProvider.get("topic").get().getProducerFactory().getConfigurationProperties(); assertThat(configurationProperties) .isEqualTo(new KafkaProperties.Producer().buildProperties(new DefaultSslBundleRegistry())); }