From 46285e35fd96abafb4ea1286b8f3083e1d2c30ec Mon Sep 17 00:00:00 2001 From: Francois LAZARUS Date: Tue, 29 Jul 2025 10:33:37 +0200 Subject: [PATCH] feat: add a pure consumer code sample Add an integration test example for a consumer-only processor Fixes: #186 --- integration-tests/consumer/Readme.md | 31 ++++ integration-tests/consumer/pom.xml | 152 ++++++++++++++++++ .../consumer/CacheManagementEndpoint.java | 65 ++++++++ .../sample/consumer/ConsumerProcessor.java | 52 ++++++ .../sample/consumer/InputEventCache.java | 66 ++++++++ .../sample/consumer/MyData.java | 29 ++++ .../src/main/resources/application.properties | 10 ++ .../sample/consumer/ConsumerProcessorIT.java | 26 +++ .../ConsumerProcessorQuarkusTest.java | 84 ++++++++++ .../consumer/ConsumerProcessorTest.java | 44 +++++ integration-tests/pom.xml | 1 + 11 files changed, 560 insertions(+) create mode 100644 integration-tests/consumer/Readme.md create mode 100644 integration-tests/consumer/pom.xml create mode 100644 integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/CacheManagementEndpoint.java create mode 100644 integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessor.java create mode 100644 integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/InputEventCache.java create mode 100644 integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/MyData.java create mode 100644 integration-tests/consumer/src/main/resources/application.properties create mode 100644 integration-tests/consumer/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessorIT.java create mode 100644 integration-tests/consumer/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessorQuarkusTest.java create mode 100644 integration-tests/consumer/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessorTest.java diff --git a/integration-tests/consumer/Readme.md b/integration-tests/consumer/Readme.md new file mode 100644 index 00000000..76bf878a --- /dev/null +++ b/integration-tests/consumer/Readme.md @@ -0,0 +1,31 @@ +# Kafka streams processing with Quarkus + +EDA consumer microservice implementation using [Kafka Streams](https://kafka.apache.org/documentation/streams/) + +## Introduction + +This module showcases the implementation of a [Kafka Stream processor](https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#overview) with `Void` output types (`Processor`), i.e. a pure consumer. + +## Services showcase + +This module showcases the stateless processing of `MyData` events and their storage in memory. +The application also exposes a REST API for testing purpose, to query the content of the in-memory storage. + +The `io.quarkiverse.kafkastreamsprocessor.sample.consumer.ConsumerProcessor` is associated to a `KafkaStreams topology` that is built using a [CDI producer](https://docs.jboss.org/weld/reference/1.0.0/en-US/html/producermethods.html) backed by the CDI bean `io.quarkiverse.kafkastreamsprocessor.impl.TopologyProducer` + +## Implementation note + +### Quarkus + +The bootstrap of this sample is [Quarkus](https://quarkus.io/) + +### Topology driver + +## Quarkus Dev mode + +The sample is fully working with the Quarkus Dev mode that allows to modify the code and have a hot replacement when the file is saved. +It can be used also to launch the application. + +``` +$> mvn clean install quarkus:dev +``` diff --git a/integration-tests/consumer/pom.xml b/integration-tests/consumer/pom.xml new file mode 100644 index 00000000..908a3fb5 --- /dev/null +++ b/integration-tests/consumer/pom.xml @@ -0,0 +1,152 @@ + + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-integration-tests + 5.0.0-SNAPSHOT + + 4.0.0 + + quarkus-kafka-streams-processor-consumer-sample + quarkus-kafka-streams-processor-consumer-sample + + + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-bom + ${project.version} + pom + import + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-test-bom + ${project.version} + pom + import + + + + + + + jakarta.inject + jakarta.inject-api + + + jakarta.enterprise + jakarta.enterprise.cdi-api + + + jakarta.ws.rs + jakarta.ws.rs-api + + + + + io.quarkus + quarkus-kafka-streams + runtime + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor + runtime + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-impl + runtime + + + io.quarkus + quarkus-smallrye-health + runtime + + + io.quarkus + quarkus-micrometer-registry-prometheus + runtime + + + io.quarkus + quarkus-opentelemetry + runtime + + + io.quarkus + quarkus-rest + + + + org.apache.kafka + kafka-streams + + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-api + + + + org.apache.kafka + kafka-clients + test + + + org.junit.jupiter + junit-jupiter-api + test + + + io.quarkus + quarkus-junit5 + test + + + io.quarkus + quarkus-test-common + test + + + org.springframework.kafka + spring-kafka-test + test + + + org.mockito + mockito-core + test + + + org.mockito + mockito-junit-jupiter + test + + + io.rest-assured + rest-assured + test + + + org.projectlombok + lombok + provided + + + org.slf4j + slf4j-api + + + org.hamcrest + hamcrest + test + + + io.quarkiverse.kafkastreamsprocessor + quarkus-kafka-streams-processor-test-framework + test + + + diff --git a/integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/CacheManagementEndpoint.java b/integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/CacheManagementEndpoint.java new file mode 100644 index 00000000..077e4589 --- /dev/null +++ b/integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/CacheManagementEndpoint.java @@ -0,0 +1,65 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.consumer; + +import jakarta.inject.Inject; +import jakarta.ws.rs.DELETE; +import jakarta.ws.rs.GET; +import jakarta.ws.rs.Path; +import jakarta.ws.rs.Produces; +import jakarta.ws.rs.core.MediaType; + +/** + * REST endpoint to list and delete events in the {@link InputEventCache}. + *

+ * This endpoint is used for testing purposes only. + * For the main functionality of this code sample, refer to {@link ConsumerProcessor}. + */ +@Path("/cached-events") +public class CacheManagementEndpoint { + private final InputEventCache inputEventCache; + + @Inject + public CacheManagementEndpoint(InputEventCache inputEventCache) { + this.inputEventCache = inputEventCache; + } + + /** + * Lists all stored events in the input event store. + * + * @return A string representation of all stored events, or a message indicating no events are stored. + */ + @GET + @Produces(MediaType.TEXT_PLAIN) + public String getCachedEvents() { + return inputEventCache.describeCachedEvents(); + } + + /** + * Deletes all stored events in the input event store. + * + * @return A message indicating the number of events deleted. + */ + @DELETE + @Produces(MediaType.TEXT_PLAIN) + public String deleteCachedEvents() { + return String.format("Deleted %d events", inputEventCache.clearCachedEvents()); + } +} diff --git a/integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessor.java b/integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessor.java new file mode 100644 index 00000000..df90a0e7 --- /dev/null +++ b/integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessor.java @@ -0,0 +1,52 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.consumer; + +import jakarta.inject.Inject; + +import org.apache.kafka.streams.processor.api.ContextualProcessor; +import org.apache.kafka.streams.processor.api.Record; + +import io.quarkiverse.kafkastreamsprocessor.api.Processor; +import lombok.extern.slf4j.Slf4j; + +/** + * Pure consumer processor that stores received events in memory. + */ +@Slf4j +@Processor +public class ConsumerProcessor extends ContextualProcessor { + + private final InputEventCache inputEventCache; + + @Inject + public ConsumerProcessor(InputEventCache inputEventCache) { + this.inputEventCache = inputEventCache; + } + + /** + * {@inheritDoc} + */ + @Override + public void process(Record input) { + log.info("Processing record with key {} and value {}", input.key(), input.value()); + inputEventCache.cacheEvent(input.key(), input.value()); + } +} diff --git a/integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/InputEventCache.java b/integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/InputEventCache.java new file mode 100644 index 00000000..cbd0150a --- /dev/null +++ b/integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/InputEventCache.java @@ -0,0 +1,66 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.consumer; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import jakarta.enterprise.context.ApplicationScoped; + +/** + * A simple in-memory event cache + * This class is used to store and retrieve events for testing purposes. + */ +@ApplicationScoped +public class InputEventCache { + Map events = new ConcurrentHashMap<>(); + + public void cacheEvent(String key, MyData value) { + events.put(key, value); + } + + /** + * Lists all cached events in a human-readable format. + * + * @return A string representation of all cached events, or a message indicating no events are stored. + */ + public String describeCachedEvents() { + if (events.isEmpty()) { + return "No events cached"; + } else { + StringBuilder sb = new StringBuilder(); + for (Map.Entry entry : events.entrySet()) { + sb.append("Key: ").append(entry.getKey()).append(", Value: ").append(entry.getValue()).append("\n"); + } + return sb.toString(); + } + } + + /** + * Clear the cache + * + * @return The number of events deleted by this operation + */ + public int clearCachedEvents() { + int eventCount = events.size(); + events.clear(); + return eventCount; + } +} diff --git a/integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/MyData.java b/integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/MyData.java new file mode 100644 index 00000000..3d0b8707 --- /dev/null +++ b/integration-tests/consumer/src/main/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/MyData.java @@ -0,0 +1,29 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.consumer; + +import io.quarkus.runtime.annotations.RegisterForReflection; + +/** + * Simple event data object used by the {@link ConsumerProcessor} + */ +@RegisterForReflection +public record MyData(String value) { +} diff --git a/integration-tests/consumer/src/main/resources/application.properties b/integration-tests/consumer/src/main/resources/application.properties new file mode 100644 index 00000000..b44f05e4 --- /dev/null +++ b/integration-tests/consumer/src/main/resources/application.properties @@ -0,0 +1,10 @@ +kafkastreamsprocessor.input.topic=input-topic +quarkus.kafka-streams.topics=input-topic +kafka-streams.producer.linger.ms=0 + +# Runs native test with test profile https://github.com/quarkusio/quarkus/issues/4371 +quarkus.test.integration-test-profile=test +%test.quarkus.kafka-streams.application-id=consumer-sample +quarkus.kafka.devservices.provider=kafka-native +quarkus.kafka.devservices.topic-partitions.input-topic=2 +quarkus.http.test-port=0 diff --git a/integration-tests/consumer/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessorIT.java b/integration-tests/consumer/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessorIT.java new file mode 100644 index 00000000..2c78adf6 --- /dev/null +++ b/integration-tests/consumer/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessorIT.java @@ -0,0 +1,26 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.consumer; + +import io.quarkus.test.junit.QuarkusIntegrationTest; + +@QuarkusIntegrationTest +public class ConsumerProcessorIT extends ConsumerProcessorQuarkusTest { +} diff --git a/integration-tests/consumer/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessorQuarkusTest.java b/integration-tests/consumer/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessorQuarkusTest.java new file mode 100644 index 00000000..d360f2e8 --- /dev/null +++ b/integration-tests/consumer/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessorQuarkusTest.java @@ -0,0 +1,84 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.consumer; + +import static io.restassured.RestAssured.given; +import static org.hamcrest.Matchers.is; + +import java.util.Map; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.springframework.kafka.test.utils.KafkaTestUtils; + +import io.quarkiverse.kafkastreamsprocessor.testframework.KafkaBootstrapServers; +import io.quarkiverse.kafkastreamsprocessor.testframework.QuarkusIntegrationCompatibleKafkaDevServicesResource; +import io.quarkus.test.common.QuarkusTestResource; +import io.quarkus.test.junit.QuarkusTest; + +/** + * Blackbox test that can run both in JVM and native modes (@Inject and @ConfigProperty not allowed) + */ +@QuarkusTest +@QuarkusTestResource(QuarkusIntegrationCompatibleKafkaDevServicesResource.class) +public class ConsumerProcessorQuarkusTest { + @KafkaBootstrapServers + String kafkaBootstrapServers; + + String senderTopic = "input-topic"; + + KafkaProducer producer; + + @BeforeEach + public void setup() { + Map producerProps = KafkaTestUtils.producerProps(kafkaBootstrapServers); + producer = new KafkaProducer<>(producerProps, new StringSerializer(), new StringSerializer()); + } + + @AfterEach + public void tearDown() { + producer.close(); + } + + @Test + public void shouldStoreConsumedEvents() throws Exception { + // Ensure the storage is empty before the test + given().when().delete("/cached-events") + .then().statusCode(200); + given().when().get("/cached-events") + .then().statusCode(200) + .body(is("No events cached")); + + producer.send(new ProducerRecord<>(senderTopic, "MyEventKey", "{ \"value\": \"Hello, World!\" }")); + producer.flush(); + + // Wait for the event to be processed + Thread.sleep(1000); + + given() + .when().get("/cached-events") + .then().statusCode(200) + .body(is("Key: MyEventKey, Value: MyData[value=Hello, World!]\n")); + } +} diff --git a/integration-tests/consumer/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessorTest.java b/integration-tests/consumer/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessorTest.java new file mode 100644 index 00000000..8b77a8c0 --- /dev/null +++ b/integration-tests/consumer/src/test/java/io/quarkiverse/kafkastreamsprocessor/sample/consumer/ConsumerProcessorTest.java @@ -0,0 +1,44 @@ +/*- + * #%L + * Quarkus Kafka Streams Processor + * %% + * Copyright (C) 2024 Amadeus s.a.s. + * %% + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * #L% + */ +package io.quarkiverse.kafkastreamsprocessor.sample.consumer; + +import static org.mockito.Mockito.verify; + +import org.apache.kafka.streams.processor.api.Record; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +@ExtendWith(MockitoExtension.class) +public class ConsumerProcessorTest { + @Mock + InputEventCache inputEventCache; + + @Test + void shouldStoreConsumedEvents() { + ConsumerProcessor processor = new ConsumerProcessor(inputEventCache); + + processor.process(new Record<>("key", new MyData("value"), 0L)); + + verify(inputEventCache).cacheEvent("key", new MyData("value")); + } + +} diff --git a/integration-tests/pom.xml b/integration-tests/pom.xml index d9248469..95d00037 100644 --- a/integration-tests/pom.xml +++ b/integration-tests/pom.xml @@ -18,6 +18,7 @@ stateful stateful-global custom-serde + consumer