Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 6 additions & 7 deletions src/main/kotlin/io/github/nomisRev/kafka/Producer.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.flatMapConcat
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.map
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.clients.producer.KafkaProducer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.clients.producer.ProducerRecord
Expand Down Expand Up @@ -134,12 +133,12 @@ public fun <K, V> kafkaProducer(
}
}

public enum class Acks(public val value: String) {
All("all"),
MinusOne("-1"),
Zero("0"),
One("1")
}
@Deprecated(
"Use io.github.nomisRev.kafka.publisher.Acks instead",
ReplaceWith("this", "io.github.nomisRev.kafka.publisher.Acks")
)
typealias Acks =
io.github.nomisRev.kafka.publisher.Acks

/**
* A type-safe constructor for [KafkaProducer] settings.
Expand Down
38 changes: 38 additions & 0 deletions src/main/kotlin/io/github/nomisRev/kafka/publisher/Acks.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.github.nomisRev.kafka.publisher

/**
* The number of acknowledgments the producer requires the leader to have received before considering a request complete.
* This controls the durability of records that are sent
*
* **Note:** that enabling idempotence requires this config value to be 'all'.
* If conflicting configurations are set and idempotence is not explicitly enabled, idempotence is disabled.
*/
public enum class Acks(public val value: String) {
/**
* If set to zero then the producer will not wait for any acknowledgment from the server at all.
* The record will be immediately added to the socket buffer and considered sent.
* No guarantee can be made that the server has received the record in this case,
* and the <code>retries</code> configuration will not take effect (as the client won't generally know of any failures).
* The offset given back for each record will always be set to `-1`
*/
Zero("0"),

/**
* This will mean the leader will write the record to its local log but will respond without awaiting full acknowledgement from all followers.
* In this case should the leader fail immediately after acknowledging the record but before the followers have replicated it then the record will be lost.
*/
One("1"),

/**
* This means the leader will wait for the full set of in-sync replicas to acknowledge the record.
* This guarantees that the record will not be lost as long as at least one in-sync replica remains alive.
* This is the strongest available guarantee. This is equivalent to the `acks=-1` setting.
*/
All("all"),

/**
* Alias to all
* @see All
*/
MinusOne("-1"),
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
package io.github.nomisRev.kafka.publisher

import io.github.nomisRev.kafka.NothingSerializer
import io.github.nomisRev.kafka.receiver.isPosNonZero
import io.github.nomisRev.kafka.publisher.PublisherOptions.ProducerListener
import kotlinx.coroutines.channels.Channel
import org.apache.kafka.clients.producer.Producer
import org.apache.kafka.clients.producer.ProducerConfig
import org.apache.kafka.common.serialization.Serializer
import java.util.Properties
import kotlin.time.Duration

/**
* @param bootstrapServers list of host/port pairs to use for establishing the initial connection to the Kafka cluster. Should be comma separated.
* @param keySerializer the [Serializer] to use to serialize the [Key] when sending messages to kafka.
* @param valueSerializer the [Serializer] to use to serialize the [Value] when sending messages to kafka.
* @param acknowledgments configuration to use.
* @param closeTimeout the timeout when closing the created underlying [Producer], default [Duration.INFINITE].
* @param maxInFlight the maximum number of in-flight records that are fetched from the outbound record publisher while acknowledgements are pending. Default [Channel.BUFFERED].
* @param stopOnError if a send operation should be terminated when an error is encountered. If set to false, send is attempted for all records in a sequence even if send of one of the records fails with a non-fatal exception.
* @param producerListener listener that is called whenever a [Producer] is added, and removed.
*/
data class PublisherOptions<Key, Value>(
val bootstrapServers: String,
val keySerializer: Serializer<Key>,
val valueSerializer: Serializer<Value>,
val acknowledgments: Acks = Acks.One,
val closeTimeout: Duration = Duration.INFINITE,
val maxInFlight: Int = Channel.BUFFERED,
val stopOnError: Boolean = true,
val producerListener: ProducerListener = NoOpProducerListener,
val properties: Properties = Properties(),
) {

init {
require(closeTimeout.isPosNonZero()) { "Close timeout must be >= 0 but found $closeTimeout" }
}

internal fun properties(): Properties = Properties().apply {
put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers)
put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, keySerializer::class.qualifiedName)
put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, valueSerializer::class.qualifiedName)
put(ProducerConfig.ACKS_CONFIG, acknowledgments.value)
putAll(properties)
}

/** Called whenever a [Producer] is added or removed. */
interface ProducerListener {
/**
* A new producer was created.
* @param id the producer id (factory name and client.id separated by a period).
* @param producer the producer.
*/
fun producerAdded(id: String, producer: Producer<*, *>) {}

/**
* An existing producer was removed.
* @param id the producer id (factory bean name and client.id separated by a period).
* @param producer the producer.
*/
fun producerRemoved(id: String, producer: Producer<*, *>) {}
}
}

/** Alternative constructor for [PublisherOptions] without a key */
public fun <Value> PublisherOptions(
bootstrapServers: String,
valueSerializer: Serializer<Value>,
acknowledgments: Acks = Acks.One,
closeTimeout: Duration = Duration.INFINITE,
maxInFlight: Int = Channel.BUFFERED,
stopOnError: Boolean = true,
producerListener: ProducerListener = NoOpProducerListener,
properties: Properties = Properties(),
): PublisherOptions<Nothing, Value> =
PublisherOptions(
bootstrapServers,
NothingSerializer,
valueSerializer,
acknowledgments,
closeTimeout,
maxInFlight,
stopOnError,
producerListener,
properties
)

private object NoOpProducerListener : ProducerListener
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.github.nomisRev.kafka.receiver

import io.github.nomisRev.kafka.NothingDeserializer
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.Deserializer
import java.util.Properties
Expand Down Expand Up @@ -86,9 +87,3 @@ public fun <V> ReceiverSettings(
closeTimeout,
properties
)

private object NothingDeserializer : Deserializer<Nothing> {
override fun close(): Unit = Unit
override fun configure(configs: MutableMap<String, *>?, isKey: Boolean): Unit = Unit
override fun deserialize(topic: String?, data: ByteArray?): Nothing = TODO("Impossible")
}
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@ internal class CommittableOffset<K, V>(

override suspend fun commit(): Unit =
if (maybeUpdateOffset() > 0) suspendCoroutine { cont ->
loop.commitBatch.addContinuation(cont)
loop.scheduleCommitIfRequired()
loop.commitBatch.addContinuation(cont)
loop.scheduleCommitIfRequired()
} else Unit

override suspend fun acknowledge() {
Expand Down Expand Up @@ -255,11 +255,11 @@ internal class EventLoop<K, V>(
toResume.removeAll(pausedByUser)
pausedByUser.clear()
consumer.resume(toResume)
if (logger.isDebugEnabled) {
logger.debug("Resumed partitions: $toResume")
if (logger.isDebugEnabled) {
logger.debug("Resumed partitions: $toResume")
}
}
}
} else {
} else {
if (checkAndSetPausedByUs()) {
pausedByUser.addAll(consumer.paused())
consumer.pause(consumer.assignment())
Expand Down Expand Up @@ -357,7 +357,7 @@ internal class EventLoop<K, V>(
commitSuccess(commitArgs, commitArgs.offsets)
atmostOnceOffsets.onCommit(commitArgs.offsets)
}
// Handled separately using transactional KafkaSender
// Handled separately using transactional KafkaPublisher
AckMode.EXACTLY_ONCE -> Unit
}
}
Expand Down