Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 31 additions & 0 deletions integration-tests/consumer/Readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
# Kafka streams processing with Quarkus

EDA consumer microservice implementation using [Kafka Streams](https://kafka.apache.org/documentation/streams/)

## Introduction

This module showcases the implementation of a [Kafka Stream processor](https://kafka.apache.org/25/documentation/streams/developer-guide/processor-api.html#overview) with `Void` output types (`Processor<K, V, Void, Void>`), i.e. a pure consumer.

## Services showcase

This module showcases the stateless processing of `MyData` events and their storage in memory.
The application also exposes a REST API for testing purpose, to query the content of the in-memory storage.

The `io.quarkiverse.kafkastreamsprocessor.sample.consumer.ConsumerProcessor` is associated to a `KafkaStreams topology` that is built using a [CDI producer](https://docs.jboss.org/weld/reference/1.0.0/en-US/html/producermethods.html) backed by the CDI bean `io.quarkiverse.kafkastreamsprocessor.impl.TopologyProducer`

## Implementation note

### Quarkus

The bootstrap of this sample is [Quarkus](https://quarkus.io/)

### Topology driver

## Quarkus Dev mode

The sample is fully working with the Quarkus Dev mode that allows to modify the code and have a hot replacement when the file is saved.
It can be used also to launch the application.

```
$> mvn clean install quarkus:dev
```
152 changes: 152 additions & 0 deletions integration-tests/consumer/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-integration-tests</artifactId>
<version>5.0.0-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>

<artifactId>quarkus-kafka-streams-processor-consumer-sample</artifactId>
<name>quarkus-kafka-streams-processor-consumer-sample</name>

<dependencyManagement>
<dependencies>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-bom</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-test-bom</artifactId>
<version>${project.version}</version>
<type>pom</type>
<scope>import</scope>
</dependency>
</dependencies>
</dependencyManagement>

<dependencies>
<dependency>
<groupId>jakarta.inject</groupId>
<artifactId>jakarta.inject-api</artifactId>
</dependency>
<dependency>
<groupId>jakarta.enterprise</groupId>
<artifactId>jakarta.enterprise.cdi-api</artifactId>
</dependency>
<dependency>
<groupId>jakarta.ws.rs</groupId>
<artifactId>jakarta.ws.rs-api</artifactId>
</dependency>

<!-- Quarkus extensions -->
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-kafka-streams</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-impl</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-smallrye-health</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-micrometer-registry-prometheus</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-opentelemetry</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-rest</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
</dependency>

<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-api</artifactId>
</dependency>

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-api</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-junit5</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkus</groupId>
<artifactId>quarkus-test-common</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.rest-assured</groupId>
<artifactId>rest-assured</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
<dependency>
<groupId>org.hamcrest</groupId>
<artifactId>hamcrest</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.quarkiverse.kafkastreamsprocessor</groupId>
<artifactId>quarkus-kafka-streams-processor-test-framework</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
/*-
* #%L
* Quarkus Kafka Streams Processor
* %%
* Copyright (C) 2024 Amadeus s.a.s.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.quarkiverse.kafkastreamsprocessor.sample.consumer;

import jakarta.inject.Inject;
import jakarta.ws.rs.DELETE;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.Path;
import jakarta.ws.rs.Produces;
import jakarta.ws.rs.core.MediaType;

/**
* REST endpoint to list and delete events in the {@link InputEventCache}.
* <p>
* This endpoint is used for testing purposes only.
* For the main functionality of this code sample, refer to {@link ConsumerProcessor}.
*/
@Path("/cached-events")
public class CacheManagementEndpoint {
private final InputEventCache inputEventCache;

@Inject
public CacheManagementEndpoint(InputEventCache inputEventCache) {
this.inputEventCache = inputEventCache;
}

/**
* Lists all stored events in the input event store.
*
* @return A string representation of all stored events, or a message indicating no events are stored.
*/
@GET
@Produces(MediaType.TEXT_PLAIN)
public String getCachedEvents() {
return inputEventCache.describeCachedEvents();
}

/**
* Deletes all stored events in the input event store.
*
* @return A message indicating the number of events deleted.
*/
@DELETE
@Produces(MediaType.TEXT_PLAIN)
public String deleteCachedEvents() {
return String.format("Deleted %d events", inputEventCache.clearCachedEvents());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*-
* #%L
* Quarkus Kafka Streams Processor
* %%
* Copyright (C) 2024 Amadeus s.a.s.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.quarkiverse.kafkastreamsprocessor.sample.consumer;

import jakarta.inject.Inject;

import org.apache.kafka.streams.processor.api.ContextualProcessor;
import org.apache.kafka.streams.processor.api.Record;

import io.quarkiverse.kafkastreamsprocessor.api.Processor;
import lombok.extern.slf4j.Slf4j;

/**
* Pure consumer processor that stores received events in memory.
*/
@Slf4j
@Processor
public class ConsumerProcessor extends ContextualProcessor<String, MyData, Void, Void> {

private final InputEventCache inputEventCache;

@Inject
public ConsumerProcessor(InputEventCache inputEventCache) {
this.inputEventCache = inputEventCache;
}

/**
* {@inheritDoc}
*/
@Override
public void process(Record<String, MyData> input) {
log.info("Processing record with key {} and value {}", input.key(), input.value());
inputEventCache.cacheEvent(input.key(), input.value());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
/*-
* #%L
* Quarkus Kafka Streams Processor
* %%
* Copyright (C) 2024 Amadeus s.a.s.
* %%
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* #L%
*/
package io.quarkiverse.kafkastreamsprocessor.sample.consumer;

import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

import jakarta.enterprise.context.ApplicationScoped;

/**
* A simple in-memory event cache
* This class is used to store and retrieve events for testing purposes.
*/
@ApplicationScoped
public class InputEventCache {
Map<String, MyData> events = new ConcurrentHashMap<>();

public void cacheEvent(String key, MyData value) {
events.put(key, value);
}

/**
* Lists all cached events in a human-readable format.
*
* @return A string representation of all cached events, or a message indicating no events are stored.
*/
public String describeCachedEvents() {
if (events.isEmpty()) {
return "No events cached";
} else {
StringBuilder sb = new StringBuilder();
for (Map.Entry<String, MyData> entry : events.entrySet()) {
sb.append("Key: ").append(entry.getKey()).append(", Value: ").append(entry.getValue()).append("\n");
}
return sb.toString();
}
}

/**
* Clear the cache
*
* @return The number of events deleted by this operation
*/
public int clearCachedEvents() {
int eventCount = events.size();
events.clear();
return eventCount;
}
}
Loading