Skip to content

Commit 0c9a175

Browse files
committed
feat(kafka): prepare to use different kafka templates per topic during publishing
1 parent 95d63a9 commit 0c9a175

File tree

6 files changed

+83
-32
lines changed

6 files changed

+83
-32
lines changed

springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/asyncapi/kafka/SpringwolfKafkaProducerConfiguration.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@
55
import io.github.stavshamir.springwolf.asyncapi.controller.SpringwolfKafkaController;
66
import io.github.stavshamir.springwolf.configuration.properties.SpringwolfKafkaConfigProperties;
77
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaProducer;
8-
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFactory;
8+
import io.github.stavshamir.springwolf.producer.SpringwolfKafkaTemplateFromProperties;
99
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
1010
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
1111
import org.springframework.context.annotation.Bean;
@@ -30,14 +30,15 @@ public SpringwolfKafkaController springwolfKafkaController(
3030

3131
@Bean
3232
@ConditionalOnMissingBean
33-
public SpringwolfKafkaProducer springwolfKafkaProducer(SpringwolfKafkaTemplateFactory producerTemplateFactory) {
34-
return new SpringwolfKafkaProducer(producerTemplateFactory.buildKafkaTemplate());
33+
public SpringwolfKafkaProducer springwolfKafkaProducer(
34+
SpringwolfKafkaTemplateFromProperties springwolfKafkaTemplateFromProperties) {
35+
return new SpringwolfKafkaProducer(springwolfKafkaTemplateFromProperties);
3536
}
3637

3738
@Bean
3839
@ConditionalOnMissingBean
39-
public SpringwolfKafkaTemplateFactory springwolfKafkaTemplateFactory(
40+
public SpringwolfKafkaTemplateFromProperties springwolfKafkaTemplateFromProperties(
4041
SpringwolfKafkaConfigProperties springwolfKafkaConfigProperties) {
41-
return new SpringwolfKafkaTemplateFactory(springwolfKafkaConfigProperties);
42+
return new SpringwolfKafkaTemplateFromProperties(springwolfKafkaConfigProperties);
4243
}
4344
}

springwolf-plugins/springwolf-kafka-plugin/src/main/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducer.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -20,21 +20,22 @@
2020
@RequiredArgsConstructor
2121
public class SpringwolfKafkaProducer {
2222

23-
private final Optional<KafkaTemplate<Object, Object>> kafkaTemplate;
23+
private final SpringwolfKafkaTemplateProvider kafkaTemplateProvider;
2424

2525
public boolean isEnabled() {
26-
return kafkaTemplate.isPresent();
26+
return kafkaTemplateProvider.isPresent();
2727
}
2828

2929
public void send(String topic, String key, Map<String, String> headers, Object payload) {
30+
Optional<KafkaTemplate<Object, Object>> kafkaTemplate = kafkaTemplateProvider.get(topic);
3031
if (kafkaTemplate.isPresent()) {
3132
kafkaTemplate
3233
.get()
3334
.send(buildProducerRecord(topic, key, headers, payload))
3435
.toCompletableFuture()
3536
.join();
3637
} else {
37-
log.warn("Kafka producer is not configured");
38+
log.warn("Kafka producer for topic %s is not configured".formatted(topic));
3839
}
3940
}
4041

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,11 @@
1414

1515
@RequiredArgsConstructor
1616
@ConditionalOnBean(value = SpringwolfKafkaProducerConfiguration.class)
17-
public class SpringwolfKafkaTemplateFactory {
17+
public class SpringwolfKafkaTemplateFromProperties implements SpringwolfKafkaTemplateProvider {
1818

19-
private final SpringwolfKafkaConfigProperties springWolfKafkaConfigProperties;
20-
21-
public Optional<KafkaTemplate<Object, Object>> buildKafkaTemplate() {
22-
Optional<KafkaTemplate<Object, Object>> kafkaTemplate = Optional.empty();
19+
private final Optional<KafkaTemplate<Object, Object>> kafkaTemplate;
2320

21+
public SpringwolfKafkaTemplateFromProperties(SpringwolfKafkaConfigProperties springWolfKafkaConfigProperties) {
2422
if (springWolfKafkaConfigProperties.getPublishing() != null
2523
&& springWolfKafkaConfigProperties.getPublishing().getProducer() != null) {
2624
Map<String, Object> producerProperties = springWolfKafkaConfigProperties
@@ -30,8 +28,18 @@ public Optional<KafkaTemplate<Object, Object>> buildKafkaTemplate() {
3028
DefaultKafkaProducerFactory<Object, Object> producerFactory =
3129
new DefaultKafkaProducerFactory<>(producerProperties);
3230
kafkaTemplate = Optional.of(new KafkaTemplate<>(producerFactory));
31+
} else {
32+
kafkaTemplate = Optional.empty();
3333
}
34+
}
35+
36+
@Override
37+
public boolean isPresent() {
38+
return kafkaTemplate.isPresent();
39+
}
3440

41+
@Override
42+
public Optional<KafkaTemplate<Object, Object>> get(String topic) {
3543
return kafkaTemplate;
3644
}
3745
}
Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,20 @@
1+
// SPDX-License-Identifier: Apache-2.0
2+
package io.github.stavshamir.springwolf.producer;
3+
4+
import org.springframework.kafka.core.KafkaTemplate;
5+
6+
import java.util.Optional;
7+
8+
public interface SpringwolfKafkaTemplateProvider {
9+
/**
10+
* Check if publishing in general is possible
11+
*
12+
* @return true if at least in one case a kafka template is present
13+
*/
14+
boolean isPresent();
15+
16+
/**
17+
* Returns the matching kafka template for the topic
18+
*/
19+
Optional<KafkaTemplate<Object, Object>> get(String topic);
20+
}

springwolf-plugins/springwolf-kafka-plugin/src/test/java/io/github/stavshamir/springwolf/producer/SpringwolfKafkaProducerTest.java

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import java.util.concurrent.CompletableFuture;
2323

2424
import static org.assertj.core.api.Assertions.assertThat;
25+
import static org.mockito.ArgumentMatchers.any;
2526
import static org.mockito.Mockito.mock;
2627
import static org.mockito.Mockito.verify;
2728
import static org.mockito.Mockito.when;
@@ -32,19 +33,23 @@ class SpringwolfKafkaProducerTest {
3233
private SpringwolfKafkaProducer springwolfKafkaProducer;
3334

3435
@Mock
35-
KafkaTemplate<Object, Object> kafkaTemplate;
36+
private SpringwolfKafkaTemplateProvider kafkaTemplateProvider;
37+
38+
@Mock
39+
private KafkaTemplate<Object, Object> kafkaTemplate;
3640

3741
@Captor
3842
private ArgumentCaptor<ProducerRecord<Object, Object>> recordArgumentCaptor;
3943

4044
@BeforeEach
4145
void setUp() {
42-
springwolfKafkaProducer = new SpringwolfKafkaProducer(Optional.of(kafkaTemplate));
46+
springwolfKafkaProducer = new SpringwolfKafkaProducer(kafkaTemplateProvider);
4347
}
4448

4549
@Test
4650
void testSpringwolfKafkaProducerIsNotEnabledWhenThereIsNoKafkaTemplateConfigured() {
47-
Optional<KafkaTemplate<Object, Object>> kafkaTemplateMock = Optional.empty();
51+
SpringwolfKafkaTemplateProvider kafkaTemplateMock = mock();
52+
when(kafkaTemplateMock.isPresent()).thenReturn(false);
4853

4954
springwolfKafkaProducer = new SpringwolfKafkaProducer(kafkaTemplateMock);
5055

@@ -54,15 +59,20 @@ void testSpringwolfKafkaProducerIsNotEnabledWhenThereIsNoKafkaTemplateConfigured
5459
@Test
5560
@SuppressWarnings("unchecked")
5661
void testSendingKafkaMessageWithoutHeaders() {
62+
// given
63+
when(kafkaTemplateProvider.get(any())).thenReturn(Optional.of(kafkaTemplate));
64+
5765
CompletableFuture<SendResult<Object, Object>> future = new CompletableFuture<>();
5866
when(kafkaTemplate.send(ArgumentMatchers.<ProducerRecord<Object, Object>>any()))
5967
.thenReturn(future);
6068
future.complete(mock(SendResult.class));
6169

6270
Map<String, Object> payload = Collections.singletonMap("some", "field");
6371

72+
// when
6473
springwolfKafkaProducer.send("test-topic", null, null, payload);
6574

75+
// then
6676
verify(kafkaTemplate).send(recordArgumentCaptor.capture());
6777

6878
ProducerRecord<Object, Object> capturedRecord = recordArgumentCaptor.getValue();
@@ -78,6 +88,9 @@ void testSendingKafkaMessageWithoutHeaders() {
7888
@Test
7989
@SuppressWarnings("unchecked")
8090
void testSendingKafkaMessageWithHeaders() {
91+
// given
92+
when(kafkaTemplateProvider.get(any())).thenReturn(Optional.of(kafkaTemplate));
93+
8194
CompletableFuture<SendResult<Object, Object>> future = new CompletableFuture<>();
8295
when(kafkaTemplate.send(ArgumentMatchers.<ProducerRecord<Object, Object>>any()))
8396
.thenReturn(future);
@@ -86,8 +99,10 @@ void testSendingKafkaMessageWithHeaders() {
8699
Map<String, Object> payload = Collections.singletonMap("some", "field");
87100
Map<String, String> headers = Collections.singletonMap("header-key", "header");
88101

102+
// when
89103
springwolfKafkaProducer.send("test-topic", null, headers, payload);
90104

105+
// then
91106
verify(kafkaTemplate).send(recordArgumentCaptor.capture());
92107

93108
ProducerRecord<Object, Object> capturedRecord = recordArgumentCaptor.getValue();
Lines changed: 22 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -5,55 +5,61 @@
55
import org.junit.jupiter.api.Test;
66
import org.springframework.boot.autoconfigure.kafka.KafkaProperties;
77
import org.springframework.boot.ssl.DefaultSslBundleRegistry;
8-
import org.springframework.kafka.core.KafkaTemplate;
98

109
import java.util.Map;
11-
import java.util.Optional;
1210

1311
import static org.assertj.core.api.Assertions.assertThat;
1412

15-
class SpringwolfKafkaTemplateFactoryTest {
13+
class SpringwolfKafkaTemplateFromPropertiesTest {
1614

1715
@Test
1816
void kafkaTemplateShouldNotBeCreatedForEmptyProperties() {
17+
// given
1918
SpringwolfKafkaConfigProperties configProperties = new SpringwolfKafkaConfigProperties();
2019

21-
SpringwolfKafkaTemplateFactory templateFactory = new SpringwolfKafkaTemplateFactory(configProperties);
20+
// when
21+
SpringwolfKafkaTemplateFromProperties kafkaTemplateProvider =
22+
new SpringwolfKafkaTemplateFromProperties(configProperties);
2223

23-
Optional<KafkaTemplate<Object, Object>> kafkaTemplate = templateFactory.buildKafkaTemplate();
24-
25-
assertThat(kafkaTemplate).isNotPresent();
24+
// then
25+
assertThat(kafkaTemplateProvider.isPresent()).isFalse();
26+
assertThat(kafkaTemplateProvider.get("topic")).isNotPresent();
2627
}
2728

2829
@Test
2930
void kafkaTemplateShouldNotBeCreatedForEmptyProducerConfiguration() {
31+
// given
3032
SpringwolfKafkaConfigProperties configProperties = new SpringwolfKafkaConfigProperties();
3133
configProperties.setPublishing(new SpringwolfKafkaConfigProperties.Publishing());
3234

33-
SpringwolfKafkaTemplateFactory templateFactory = new SpringwolfKafkaTemplateFactory(configProperties);
34-
35-
Optional<KafkaTemplate<Object, Object>> kafkaTemplate = templateFactory.buildKafkaTemplate();
35+
// when
36+
SpringwolfKafkaTemplateFromProperties kafkaTemplateProvider =
37+
new SpringwolfKafkaTemplateFromProperties(configProperties);
3638

37-
assertThat(kafkaTemplate).isNotPresent();
39+
// then
40+
assertThat(kafkaTemplateProvider.isPresent()).isFalse();
41+
assertThat(kafkaTemplateProvider.get("topic")).isNotPresent();
3842
}
3943

4044
@Test
4145
void kafkaTemplateShouldBeCreatedWithProducerConfiguration() {
46+
// given
4247
SpringwolfKafkaConfigProperties configProperties = new SpringwolfKafkaConfigProperties();
4348
SpringwolfKafkaConfigProperties.Publishing publishing = new SpringwolfKafkaConfigProperties.Publishing();
4449
publishing.setEnabled(true);
4550
publishing.setProducer(new KafkaProperties.Producer());
4651

4752
configProperties.setPublishing(publishing);
4853

49-
SpringwolfKafkaTemplateFactory templateFactory = new SpringwolfKafkaTemplateFactory(configProperties);
54+
// when
55+
SpringwolfKafkaTemplateFromProperties kafkaTemplateProvider =
56+
new SpringwolfKafkaTemplateFromProperties(configProperties);
5057

51-
Optional<KafkaTemplate<Object, Object>> kafkaTemplate = templateFactory.buildKafkaTemplate();
58+
// then
59+
assertThat(kafkaTemplateProvider.isPresent()).isTrue();
5260

53-
assertThat(kafkaTemplate).isPresent();
5461
Map<String, Object> configurationProperties =
55-
kafkaTemplate.get().getProducerFactory().getConfigurationProperties();
56-
62+
kafkaTemplateProvider.get("topic").get().getProducerFactory().getConfigurationProperties();
5763
assertThat(configurationProperties)
5864
.isEqualTo(new KafkaProperties.Producer().buildProperties(new DefaultSslBundleRegistry()));
5965
}

0 commit comments

Comments
 (0)