Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
113 changes: 60 additions & 53 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ Module kotlin-kafka

[![Maven Central](https://img.shields.io/maven-central/v/io.github.nomisrev/kotlin-kafka?color=4caf50&label=latest%20release)](https://maven-badges.herokuapp.com/maven-central/io.github.nomisrev/kotlin-kafka)

<!--- TEST_NAME ReadmeTest -->
<!--- TOC -->

* [Rationale](#rationale)
Expand All @@ -12,22 +11,31 @@ Module kotlin-kafka

<!--- END -->

This project is still under development, andd started as a playground where I was playing around with Kafka in Kotlin
and the Kafka SDK whilst reading the Kafka book Definite Guide from Confluent.
https://www.confluent.io/resources/kafka-the-definitive-guide-v2/

## Rationale

At the time of starting this repository I didn't find any bindings between Kafka SDK and Kotlin suspension. These
operators should be implemented low-level, so they can guarantee correct cancellation support, and high optimised
runtimes.
At the time of starting this repository I didn't find any bindings between Kafka SDK and Kotlin suspension, or KotlinX Coroutines Flow.
These operators should be implemented low-level, so they can guarantee correct cancellation support, and high optimised runtimes.

Some important aspects of Kafka are tricky to implement with the "low-level" Kafka API,
especially properly streaming records from Kafka and correctly committing them.
Additional complexity is involved in this process, more details [here](https://tuleism.github.io/blog/2021/parallel-backpressured-kafka-consumer/).

To solve these problems a couple of projects in the JVM already exist:
- [Alpakka Kafka](https://github.com/akka/alpakka-kafka)
- [reactor-kafka](https://github.com/reactor/reactor-kafka)

There was no implementation for KotlinX Coroutines Flow,
you can however quite easily use reactor-kafka with [KotlinX Coroutines Reactor bindings](https://github.com/Kotlin/kotlinx.coroutines/blob/master/reactive/kotlinx-coroutines-reactor/README.md).

This project implements the same strategies as [reactor-kafka] directly on top of KotlinX Coroutines to benefit from **all** their benefits,
and to open the door to potentially becoming a Kotlin MPP library in the future.

## Goals

- Lean Core library built on top of Kotlin Std & KotlinX Coroutines (possible extensions with Arrow in additional
module)
- Lean Core library built on top of Kotlin Std & KotlinX Coroutines
- Extensions to easily operate over the Kafka SDK with KotlinX Coroutines and `suspend`.
- Flow based operators, so you can easily compose KotlinX Flow based Kafka programs
- Strong guarantees about committing record offsets, and performance optimisations in regard to re-balancing/partitioning.
- example for testing Kafka with Test Containers in Kotlin.

## Adding Dependency
Expand All @@ -43,20 +51,24 @@ dependencies {
## Example

<!--- INCLUDE
import java.util.UUID
import kotlinx.coroutines.Dispatchers.Default
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.IntegerDeserializer
import org.apache.kafka.common.serialization.IntegerSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import java.util.UUID
-->

```kotlin
Expand All @@ -66,49 +78,46 @@ value class Key(val index: Int)
@JvmInline
value class Message(val content: String)

fun main(): Unit =
runBlocking(Default) {
val topicName = "test-topic"
val msgCount = 10
val kafka = Kafka.container
fun main(): Unit = runBlocking(Dispatchers.Default) {
val topicName = "test-topic"
val msgCount = 10
val kafka = Kafka.container

Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
client.createTopic(NewTopic(topicName, 1, 1))
}

Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
client.createTopic(NewTopic(topicName, 1, 1))
coroutineScope { // Run produces and consumer in a single scope
launch(Dispatchers.IO) { // Send 20 messages, and then close the producer
val settings: ProducerSettings<Key, Message> = ProducerSettings(
kafka.bootstrapServers,
IntegerSerializer().imap { key: Key -> key.index },
StringSerializer().imap { msg: Message -> msg.content },
Acks.All
)
(1..msgCount)
.asFlow()
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
.produce(settings)
.collect(::println)
}

coroutineScope { // Run produces and consumer in a single scope
launch { // Send 20 messages, and then close the producer
val settings: ProducerSettings<Key, Message> =
ProducerSettings(
kafka.bootstrapServers,
IntegerSerializer().imap { key: Key -> key.index },
StringSerializer().imap { msg: Message -> msg.content },
Acks.All
)
(1..msgCount)
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
.asFlow()
.produce(settings)
.collect(::println)
}

launch { // Consume 20 messages as a stream, and then close the consumer
val settings: ConsumerSettings<Key, Message> =
ConsumerSettings(
kafka.bootstrapServers,
IntegerDeserializer().map(::Key),
StringDeserializer().map(::Message),
groupId = UUID.randomUUID().toString(),
autoOffsetReset = AutoOffsetReset.Earliest
)
kafkaConsumer(settings)
.subscribeTo(topicName)
.take(msgCount)
.map { "${it.key()} -> ${it.value()}" }
.collect(::println)
}
launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer
val settings: ReceiverSettings<Key, Message> = ReceiverSettings(
kafka.bootstrapServers,
IntegerDeserializer().map(::Key),
StringDeserializer().map(::Message),
groupId = UUID.randomUUID().toString(),
autoOffsetReset = AutoOffsetReset.Earliest
)
KafkaReceiver(settings)
.receive(topicName)
.take(msgCount)
.map { "${it.key()} -> ${it.value()}" }
.collect(::println)
}
}
}
```

> You can get the full code [here](guide/example/example-readme-01.kt).
Expand All @@ -135,5 +144,3 @@ Key(index=8) -> Message(content=msg: 8)
Key(index=9) -> Message(content=msg: 9)
Key(index=10) -> Message(content=msg: 10)
```

<!--- TEST -->
7 changes: 5 additions & 2 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,11 @@ dependencies {
api(libs.kotlinx.coroutines.core)
api(libs.kotlinx.coroutines.jdk8)
api(libs.kafka.clients)

implementation(libs.slf4j.api)

testImplementation(libs.bundles.kotest)
testImplementation(libs.testcontainers.kafka)
testImplementation(libs.slf4j.simple)
}

configure<KnitPluginExtension> {
Expand Down Expand Up @@ -72,7 +75,7 @@ tasks {
kotlinOptions.jvmTarget = "1.8"
}

val cleanDocs = register<Delete>("cleanDocs") {
register<Delete>("cleanDocs") {
val folder = file("docs").also { it.mkdir() }
val docsContent = folder.listFiles().filter { it != folder }
delete(docsContent)
Expand Down
83 changes: 42 additions & 41 deletions guide/example/example-readme-01.kt
Original file line number Diff line number Diff line change
Expand Up @@ -4,67 +4,68 @@ import io.github.nomisRev.kafka.*
import java.util.Properties
import kotlinx.coroutines.runBlocking

import java.util.UUID
import kotlinx.coroutines.Dispatchers.Default
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.producer.ProducerRecord
import org.apache.kafka.common.serialization.IntegerDeserializer
import org.apache.kafka.common.serialization.IntegerSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import java.util.UUID

@JvmInline
value class Key(val index: Int)

@JvmInline
value class Message(val content: String)

fun main(): Unit =
runBlocking(Default) {
val topicName = "test-topic"
val msgCount = 10
val kafka = Kafka.container
fun main(): Unit = runBlocking(Dispatchers.Default) {
val topicName = "test-topic"
val msgCount = 10
val kafka = Kafka.container

Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
client.createTopic(NewTopic(topicName, 1, 1))
}
Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
client.createTopic(NewTopic(topicName, 1, 1))
}

coroutineScope { // Run produces and consumer in a single scope
launch { // Send 20 messages, and then close the producer
val settings: ProducerSettings<Key, Message> =
ProducerSettings(
kafka.bootstrapServers,
IntegerSerializer().imap { key: Key -> key.index },
StringSerializer().imap { msg: Message -> msg.content },
Acks.All
)
(1..msgCount)
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
.asFlow()
.produce(settings)
.collect(::println)
}
coroutineScope { // Run produces and consumer in a single scope
launch(Dispatchers.IO) { // Send 20 messages, and then close the producer
val settings: ProducerSettings<Key, Message> = ProducerSettings(
kafka.bootstrapServers,
IntegerSerializer().imap { key: Key -> key.index },
StringSerializer().imap { msg: Message -> msg.content },
Acks.All
)
(1..msgCount)
.asFlow()
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
.produce(settings)
.collect(::println)
}

launch { // Consume 20 messages as a stream, and then close the consumer
val settings: ConsumerSettings<Key, Message> =
ConsumerSettings(
kafka.bootstrapServers,
IntegerDeserializer().map(::Key),
StringDeserializer().map(::Message),
groupId = UUID.randomUUID().toString(),
autoOffsetReset = AutoOffsetReset.Earliest
)
kafkaConsumer(settings)
.subscribeTo(topicName)
.take(msgCount)
.map { "${it.key()} -> ${it.value()}" }
.collect(::println)
}
launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer
val settings: ReceiverSettings<Key, Message> = ReceiverSettings(
kafka.bootstrapServers,
IntegerDeserializer().map(::Key),
StringDeserializer().map(::Message),
groupId = UUID.randomUUID().toString(),
autoOffsetReset = AutoOffsetReset.Earliest
)
KafkaReceiver(settings)
.receive(topicName)
.take(msgCount)
.map { "${it.key()} -> ${it.value()}" }
.collect(::println)
}
}
}
35 changes: 18 additions & 17 deletions guide/src/main/kotlin/main.kt
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
package io.github.nomisRev.kafka

import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch
Expand All @@ -16,7 +18,6 @@ import org.apache.kafka.common.serialization.IntegerSerializer
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.kafka.common.serialization.StringSerializer
import java.util.UUID
import kotlin.time.Duration.Companion.milliseconds

@JvmInline
value class Key(val index: Int)
Expand All @@ -26,7 +27,7 @@ value class Message(val content: String)

fun main(): Unit = runBlocking(Dispatchers.Default) {
val topicName = "test-topic"
val msgCount = 10
val msgCount = 25
val kafka = Kafka.container

Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
Expand All @@ -42,31 +43,31 @@ fun main(): Unit = runBlocking(Dispatchers.Default) {
Acks.All
)
(1..msgCount)
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
.asFlow()
.map { index ->
delay(index * 50L)
ProducerRecord(topicName, Key(index), Message("msg: $index"))
}
.produce(settings)
.collect(::println)
}

launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer
val settings: ConsumerSettings<Key, Message> = ConsumerSettings(
val settings: ReceiverSettings<Key, Message> = ReceiverSettings(
kafka.bootstrapServers,
IntegerDeserializer().map(::Key),
StringDeserializer().map(::Message),
groupId = UUID.randomUUID().toString(),
autoOffsetReset = AutoOffsetReset.Earliest,
enableAutoCommit = false
autoOffsetReset = AutoOffsetReset.Earliest
)

KafkaConsumer(settings).asFlow()
.subscribeTo(topicName)
.tap { (key, value) -> println("$key -> $value") }
.commitBatchWithin(settings, 3, 10.milliseconds)
.take(4)
.collect()
KafkaReceiver(settings)
.receive(topicName)
.take(msgCount)
.collect {
delay(75)
println("${Thread.currentThread().name} => ${it.key()} -> ${it.value()}")
it.offset.acknowledge()
}
}
}
}

fun <A> Flow<A>.tap(also: suspend (A) -> Unit): Flow<A> =
map { it.also { also(it) } }
Loading