Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 11 additions & 20 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,19 +52,18 @@ dependencies {

<!--- INCLUDE
import arrow.continuations.SuspendApp
import io.github.nomisRev.kafka.Acks
import io.github.nomisRev.kafka.Admin
import io.github.nomisRev.kafka.AdminSettings
import io.github.nomisRev.kafka.ProducerSettings
import io.github.nomisRev.kafka.createTopic
import io.github.nomisRev.kafka.imap
import io.github.nomisRev.kafka.map
import io.github.nomisRev.kafka.produce
import io.github.nomisRev.kafka.publisher.Acks
import io.github.nomisRev.kafka.publisher.KafkaPublisher
import io.github.nomisRev.kafka.publisher.PublisherSettings
import io.github.nomisRev.kafka.receiver.AutoOffsetReset
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -94,17 +93,19 @@ fun main(): Unit = SuspendApp {
}

launch(Dispatchers.IO) { // Send 20 messages, and then close the producer
val settings: ProducerSettings<Key, Message> = ProducerSettings(
val settings: PublisherSettings<Key, Message> = PublisherSettings(
kafka.bootstrapServers,
IntegerSerializer().imap { key: Key -> key.index },
StringSerializer().imap { msg: Message -> msg.content },
Acks.All
)
(1..msgCount)
.asFlow()
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
.produce(settings)
.collect(::println)
KafkaPublisher(settings).use { publisher ->
publisher.publishScope {
(1..msgCount).forEach { index ->
offer(ProducerRecord(topicName, Key(index), Message("msg: $index")))
}
}
}
}

launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer
Expand All @@ -127,16 +128,6 @@ fun main(): Unit = SuspendApp {
> You can get the full code [here](guide/example/example-readme-01.kt).

```text
test-topic-0@0
test-topic-0@1
test-topic-0@2
test-topic-0@3
test-topic-0@4
test-topic-0@5
test-topic-0@6
test-topic-0@7
test-topic-0@8
test-topic-0@9
Key(index=1) -> Message(content=msg: 1)
Key(index=2) -> Message(content=msg: 2)
Key(index=3) -> Message(content=msg: 3)
Expand Down
21 changes: 11 additions & 10 deletions guide/example/example-readme-01.kt
Original file line number Diff line number Diff line change
@@ -1,19 +1,18 @@
package example.exampleReadme01

import arrow.continuations.SuspendApp
import io.github.nomisRev.kafka.Acks
import io.github.nomisRev.kafka.Admin
import io.github.nomisRev.kafka.AdminSettings
import io.github.nomisRev.kafka.ProducerSettings
import io.github.nomisRev.kafka.createTopic
import io.github.nomisRev.kafka.imap
import io.github.nomisRev.kafka.map
import io.github.nomisRev.kafka.produce
import io.github.nomisRev.kafka.publisher.Acks
import io.github.nomisRev.kafka.publisher.KafkaPublisher
import io.github.nomisRev.kafka.publisher.PublisherSettings
import io.github.nomisRev.kafka.receiver.AutoOffsetReset
import io.github.nomisRev.kafka.receiver.KafkaReceiver
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.map
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -41,17 +40,19 @@ fun main(): Unit = SuspendApp {
}

launch(Dispatchers.IO) { // Send 20 messages, and then close the producer
val settings: ProducerSettings<Key, Message> = ProducerSettings(
val settings: PublisherSettings<Key, Message> = PublisherSettings(
kafka.bootstrapServers,
IntegerSerializer().imap { key: Key -> key.index },
StringSerializer().imap { msg: Message -> msg.content },
Acks.All
)
(1..msgCount)
.asFlow()
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
.produce(settings)
.collect(::println)
KafkaPublisher(settings).use { publisher ->
publisher.publishScope {
(1..msgCount).forEach { index ->
offer(ProducerRecord(topicName, Key(index), Message("msg: $index")))
}
}
}
}

launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer
Expand Down
73 changes: 0 additions & 73 deletions guide/src/main/kotlin/main.kt

This file was deleted.

11 changes: 6 additions & 5 deletions src/main/kotlin/io/github/nomisRev/kafka/publisher/Acks.kt
Original file line number Diff line number Diff line change
@@ -1,18 +1,19 @@
package io.github.nomisRev.kafka.publisher

import org.apache.kafka.clients.producer.ProducerConfig

/**
* The number of acknowledgments the producer requires the leader to have received before considering a request complete.
* This controls the durability of records that are sent
*
* **Note:** that enabling idempotence requires this config value to be 'all'.
* If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.
* **NOTE:** Enabling idempotence requires this config value to be [All], otherwise [ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG] is ignored.
*/
public enum class Acks(public val value: String) {
/**
* If set to zero then the producer will not wait for any acknowledgment from the server at all.
* Using [Zero] the producer will not wait for any acknowledgment from the server at all.
* The record will be immediately added to the socket buffer and considered sent.
* No guarantee can be made that the server has received the record in this case,
* and the <code>retries</code> configuration will not take effect (as the client won't generally know of any failures).
* and [ProducerConfig.RETRIES_CONFIG] will not take effect (as the client won't generally know of any failures).
* The offset given back for each record will always be set to `-1`
*/
Zero("0"),
Expand All @@ -26,7 +27,7 @@ public enum class Acks(public val value: String) {
/**
* This means the leader will wait for the full set of in-sync replicas to acknowledge the record.
* This guarantees that the record will not be lost as long as at least one in-sync replica remains alive.
* This is the strongest available guarantee. This is equivalent to the `acks=-1` setting.
* This is the strongest available guarantee. This is equivalent to the [MinusOne] setting.
*/
All("all"),

Expand Down
Loading