From daf0f8a91067ea7e7998ab9b716dc9e5b1338abb Mon Sep 17 00:00:00 2001 From: Simon Vergauwen Date: Sat, 14 Oct 2023 20:07:37 +0200 Subject: [PATCH] Rewrite to kotlin-test, add power-assert, add utilities --- build.gradle.kts | 8 +- gradle/libs.versions.toml | 13 +- .../nomisRev/kafka/receiver/KafkaReceiver.kt | 1 + .../kafka/receiver/internals/PollLoop.kt | 2 + .../io/github/nomisrev/kafka/ChunkSpec.kt | 109 ----- .../github/nomisrev/kafka/KafkaContainer.kt | 15 + .../io/github/nomisrev/kafka/KafkaSpec.kt | 228 +++++---- .../io/github/nomisrev/kafka/KotestProject.kt | 13 - .../kotlin/io/github/nomisrev/kafka/Predef.kt | 25 +- .../nomisrev/kafka/ProducerSettingSpec.kt | 44 -- .../kafka/publisher/KafkaPublisherSpec.kt | 446 +++++++++--------- .../kafka/receiver/CommitStrategySpec.kt | 82 ++-- .../kafka/receiver/KafakReceiverSpec.kt | 284 ++++++----- 13 files changed, 577 insertions(+), 693 deletions(-) delete mode 100644 src/test/kotlin/io/github/nomisrev/kafka/ChunkSpec.kt create mode 100644 src/test/kotlin/io/github/nomisrev/kafka/KafkaContainer.kt delete mode 100644 src/test/kotlin/io/github/nomisrev/kafka/KotestProject.kt delete mode 100644 src/test/kotlin/io/github/nomisrev/kafka/ProducerSettingSpec.kt diff --git a/build.gradle.kts b/build.gradle.kts index 10592237..9a045697 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -1,3 +1,4 @@ +import com.bnorm.power.PowerAssertGradleExtension import kotlinx.knit.KnitPluginExtension import org.gradle.api.tasks.testing.logging.TestExceptionFormat import org.gradle.api.tasks.testing.logging.TestLogEvent @@ -9,6 +10,7 @@ plugins { alias(libs.plugins.spotless) alias(libs.plugins.knit) alias(libs.plugins.publish) + alias(libs.plugins.power.assert) } repositories { @@ -24,11 +26,15 @@ dependencies { api(libs.kafka.clients) implementation(libs.slf4j.api) - testImplementation(libs.bundles.kotest) + testImplementation(kotlin("test")) testImplementation(libs.testcontainers.kafka) testImplementation(libs.slf4j.simple) } +configure { + functions = listOf("kotlin.test.assertEquals") +} + configure { siteRoot = "https://nomisrev.github.io/kotlin-kafka/" } diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index 1fd17a3f..b70456b5 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -10,12 +10,10 @@ testcontainers-kafka = "1.19.1" slf4j = "2.0.9" spotless="6.22.0" publish="0.25.3" +power-assert="0.13.0" [libraries] -kotest-assertions = { module = "io.kotest:kotest-assertions-core", version.ref = "kotest" } -kotest-framework = { module = "io.kotest:kotest-framework-engine", version.ref = "kotest" } kotest-property = { module = "io.kotest:kotest-property", version.ref = "kotest" } -kotest-runner-junit5 = { module = "io.kotest:kotest-runner-junit5", version.ref = "kotest" } kafka-connect = { module = "org.apache.kafka:connect-runtime", version.ref = "kafka" } kafka-clients = { module = "org.apache.kafka:kafka-clients", version.ref = "kafka" } kafka-streams = { module = "org.apache.kafka:kafka-streams", version.ref = "kafka" } @@ -27,14 +25,6 @@ testcontainers-kafka = { module = "org.testcontainers:kafka", version.ref = "tes slf4j-api = { module = "org.slf4j:slf4j-api", version.ref = "slf4j" } slf4j-simple = { module = "org.slf4j:slf4j-simple", version.ref = "slf4j" } -[bundles] -kotest = [ - "kotest-assertions", - "kotest-framework", - "kotest-property", - "kotest-runner-junit5" -] - [plugins] kotlin-jvm = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" } dokka = { id = "org.jetbrains.dokka", version.ref = "dokka" } @@ -42,3 +32,4 @@ kover = { id = "org.jetbrains.kotlinx.kover", version.ref = "kover" } spotless = { id = "com.diffplug.spotless", version.ref = "spotless" } publish = { id = "com.vanniktech.maven.publish", version.ref="publish" } knit = { id = "org.jetbrains.kotlinx.knit", version.ref="knit" } +power-assert = { id = "com.bnorm.power.kotlin-power-assert", version.ref="power-assert" } diff --git a/src/main/kotlin/io/github/nomisRev/kafka/receiver/KafkaReceiver.kt b/src/main/kotlin/io/github/nomisRev/kafka/receiver/KafkaReceiver.kt index 94cb11b8..bfea97b4 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/receiver/KafkaReceiver.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/receiver/KafkaReceiver.kt @@ -22,6 +22,7 @@ public interface KafkaReceiver { public fun receive(topicName: String): Flow> = receive(setOf(topicName)) + .buffer() public fun receiveAutoAck(topicNames: Collection): Flow>> diff --git a/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/PollLoop.kt b/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/PollLoop.kt index 0a4581dd..6aca1614 100644 --- a/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/PollLoop.kt +++ b/src/main/kotlin/io/github/nomisRev/kafka/receiver/internals/PollLoop.kt @@ -136,12 +136,14 @@ internal class CommittableOffset( ) : Offset { private val acknowledged = AtomicBoolean(false) + // TODO should throw if called after Flow finished override suspend fun commit(): Unit = if (maybeUpdateOffset() > 0) suspendCoroutine { cont -> loop.commitBatch.addContinuation(cont) loop.scheduleCommitIfRequired() } else Unit + // TODO should throw if called after Flow finished override suspend fun acknowledge() { val uncommittedCount = maybeUpdateOffset().toLong() if (commitBatchSize in 1..uncommittedCount) { diff --git a/src/test/kotlin/io/github/nomisrev/kafka/ChunkSpec.kt b/src/test/kotlin/io/github/nomisrev/kafka/ChunkSpec.kt deleted file mode 100644 index 127014f8..00000000 --- a/src/test/kotlin/io/github/nomisrev/kafka/ChunkSpec.kt +++ /dev/null @@ -1,109 +0,0 @@ -package io.github.nomisrev.kafka - -import io.github.nomisRev.kafka.internal.chunked -import io.kotest.assertions.assertSoftly -import io.kotest.core.spec.style.StringSpec -import io.kotest.matchers.collections.shouldBeEmpty -import io.kotest.matchers.shouldBe -import io.kotest.property.Arb -import io.kotest.property.arbitrary.int -import io.kotest.property.arbitrary.list -import io.kotest.property.arbitrary.long -import io.kotest.property.arbitrary.map -import io.kotest.property.checkAll -import kotlinx.coroutines.ExperimentalCoroutinesApi -import kotlinx.coroutines.currentCoroutineContext -import kotlinx.coroutines.delay -import kotlinx.coroutines.ensureActive -import kotlinx.coroutines.flow.asFlow -import kotlinx.coroutines.flow.emptyFlow -import kotlinx.coroutines.flow.flow -import kotlinx.coroutines.flow.map -import kotlinx.coroutines.flow.take -import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.test.runTest -import kotlin.math.absoluteValue -import kotlin.time.Duration.Companion.days -import kotlin.time.Duration.Companion.microseconds -import kotlin.time.Duration.Companion.milliseconds -import kotlin.time.ExperimentalTime -import kotlin.time.measureTimedValue - -@OptIn(ExperimentalTime::class) -@ExperimentalCoroutinesApi -class ChunkSpec : StringSpec({ - - "should never lose any elements, and get full chunks if not timing out" { - runTest { - checkAll( - Arb.list(Arb.int()), - Arb.int(min = 1) // chunk needs to have minSize 1 - ) { source, maxGroupSize0 -> - val maxGroupSize = (maxGroupSize0 % 20).absoluteValue + 1 - - source.asFlow().map { i -> - delay(100.microseconds) - i - }.chunked(maxGroupSize, 2.days).toList() shouldBe source.chunked(maxGroupSize) - } - } - } - - val infinite = flow { - while (true) { - currentCoroutineContext().ensureActive() - emit(Unit) - } - } - - "Can take from infinite stream" { - runTest { - checkAll( - Arb.int(min = 1, max = 50), // chunk needs to have minSize 1 - Arb.int(min = 1, max = 50), // chunk needs to have minSize 1 - Arb.long(min = 100).map { it.milliseconds } // During warm-up can take up to 100ms for first flow - ) { size, count, timeout -> - infinite.map { - delay(100.microseconds) - }.chunked(size, timeout) - .take(count) - .toList() shouldBe List(count) { List(size) { } } - } - } - } - - "empty flow" { - runTest { - val empty = emptyFlow() - checkAll( - Arb.int(min = 1), // chunk needs to have minSize 1 - Arb.long(min = 15).map { it.milliseconds } // During warm-up can take up to 10ms for first flow - ) { size, timeout -> - val (value, duration) = measureTimedValue { - empty.chunked(size, timeout).toList() - } - value.shouldBeEmpty() - duration.inWholeMilliseconds < 15 - } - } - } - - "multiple elements not filling chunks" { - runTest { - val flow = flow { - for (i in 1..5) { - delay(500) - emit(i) - } - } - - val result = flow.chunked(500, 1100.milliseconds).toList() - assertSoftly(result) { - size shouldBe 3 - first().size shouldBe 2 - get(1).size shouldBe 2 - get(2).size shouldBe 1 - } - } - } -}) diff --git a/src/test/kotlin/io/github/nomisrev/kafka/KafkaContainer.kt b/src/test/kotlin/io/github/nomisrev/kafka/KafkaContainer.kt new file mode 100644 index 00000000..7970dfbd --- /dev/null +++ b/src/test/kotlin/io/github/nomisrev/kafka/KafkaContainer.kt @@ -0,0 +1,15 @@ +package io.github.nomisrev.kafka + +import org.testcontainers.containers.KafkaContainer +import org.testcontainers.utility.DockerImageName + +class Kafka : KafkaContainer(DockerImageName.parse("confluentinc/cp-kafka:latest")) { + + fun pause() { + dockerClient.pauseContainerCmd(containerId).exec() + } + + fun unpause() { + dockerClient.unpauseContainerCmd(containerId).exec() + } +} diff --git a/src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt b/src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt index 0c42d951..9516ffec 100644 --- a/src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt +++ b/src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt @@ -2,28 +2,20 @@ package io.github.nomisrev.kafka import io.github.nomisRev.kafka.Admin import io.github.nomisRev.kafka.AdminSettings -import io.github.nomisRev.kafka.receiver.AutoOffsetReset import io.github.nomisRev.kafka.createTopic import io.github.nomisRev.kafka.deleteTopic import io.github.nomisRev.kafka.describeTopic import io.github.nomisRev.kafka.publisher.Acks import io.github.nomisRev.kafka.publisher.KafkaPublisher import io.github.nomisRev.kafka.publisher.PublisherSettings +import io.github.nomisRev.kafka.receiver.AutoOffsetReset import io.github.nomisRev.kafka.receiver.KafkaReceiver import io.github.nomisRev.kafka.receiver.ReceiverSettings -import io.kotest.assertions.assertSoftly -import io.kotest.assertions.async.shouldTimeout -import io.kotest.assertions.fail -import io.kotest.assertions.failure -import io.kotest.core.spec.style.StringSpec -import io.kotest.matchers.shouldBe -import kotlinx.coroutines.TimeoutCancellationException -import kotlinx.coroutines.coroutineScope -import kotlinx.coroutines.flow.first +import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.map import kotlinx.coroutines.flow.take import kotlinx.coroutines.flow.toList -import kotlinx.coroutines.withTimeout +import kotlinx.coroutines.runBlocking import kotlinx.coroutines.withTimeoutOrNull import org.apache.kafka.clients.admin.Admin import org.apache.kafka.clients.admin.AdminClientConfig @@ -42,8 +34,7 @@ import org.apache.kafka.common.PartitionInfo import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.serialization.StringDeserializer import org.apache.kafka.common.serialization.StringSerializer -import org.testcontainers.containers.KafkaContainer -import org.testcontainers.utility.DockerImageName +import org.junit.jupiter.api.AfterAll import java.time.Duration import java.util.Properties import java.util.UUID @@ -51,94 +42,121 @@ import java.util.concurrent.CompletableFuture import java.util.concurrent.Executors import java.util.concurrent.Future import java.util.concurrent.TimeUnit -import kotlin.time.Duration.Companion.milliseconds +import kotlin.test.assertEquals import kotlin.time.Duration.Companion.seconds -abstract class KafkaSpec(body: KafkaSpec.() -> Unit = {}) : StringSpec() { - init { - body() - } - private val transactionTimeoutInterval = 1.seconds - private val consumerPollingTimeout = 1.seconds - val boom = RuntimeException("Boom!") - - private val kafkaImage: DockerImageName = - DockerImageName.parse("confluentinc/cp-kafka:latest") - - private val container: KafkaContainer = autoClose( - KafkaContainer(kafkaImage) - .withExposedPorts(9092, 9093) - .withNetworkAliases("broker") - .withEnv("KAFKA_HOST_NAME", "broker") - .withEnv("KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR", "1") - .withEnv("KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR", "1") - .withEnv( - "KAFKA_TRANSACTION_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS", - transactionTimeoutInterval.inWholeMilliseconds.toString() +abstract class KafkaSpec { + + companion object { + private val consumerPollingTimeout = 1.seconds + private val transactionTimeoutInterval = 1.seconds + + @AfterAll + fun destroy() { + publisher.close() + kafka.stop() + } + + @JvmStatic + val kafka: Kafka = + Kafka().apply { + withExposedPorts(9092, 9093) + withNetworkAliases("broker") + withEnv("KAFKA_HOST_NAME", "broker") + withEnv("KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR", "1") + withEnv("KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR", "1") + withEnv( + "KAFKA_TRANSACTION_ABORT_TIMED_OUT_TRANSACTION_CLEANUP_INTERVAL_MS", + transactionTimeoutInterval.inWholeMilliseconds.toString() + ) + withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.authorizer.AclAuthorizer") + withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "true") + withReuse(true) + start() + } + + val receiverSetting: ReceiverSettings = + ReceiverSettings( + bootstrapServers = kafka.bootstrapServers, + keyDeserializer = StringDeserializer(), + valueDeserializer = StringDeserializer(), + groupId = "test-group-id", + autoOffsetReset = AutoOffsetReset.Earliest, + pollTimeout = consumerPollingTimeout ) - .withEnv("KAFKA_AUTHORIZER_CLASS_NAME", "kafka.security.authorizer.AclAuthorizer") - .withEnv("KAFKA_ALLOW_EVERYONE_IF_NO_ACL_FOUND", "true") - .withReuse(true) - .also { container -> container.start() } - ) + + val publisherSettings = PublisherSettings( + bootstrapServers = kafka.bootstrapServers, + keySerializer = StringSerializer(), + valueSerializer = StringSerializer(), + properties = Properties().apply { + put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000.toString()) + put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString()) + } + ) + + val producer = KafkaProducer(publisherSettings.properties()) + + @JvmStatic + val publisher = KafkaPublisher(publisherSettings) { producer } + } private fun adminProperties(): Properties = Properties().apply { - put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, container.bootstrapServers) + put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, kafka.bootstrapServers) put(AdminClientConfig.CLIENT_ID_CONFIG, "test-kafka-admin-client-${UUID.randomUUID()}") put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, "10000") put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, "10000") } fun adminSettings(): AdminSettings = - AdminSettings(container.bootstrapServers, adminProperties()) + AdminSettings(kafka.bootstrapServers, adminProperties()) inline fun admin(body: Admin.() -> A): A = Admin(adminSettings()).use(body) - fun receiverSetting(): ReceiverSettings = - ReceiverSettings( - bootstrapServers = container.bootstrapServers, - keyDeserializer = StringDeserializer(), - valueDeserializer = StringDeserializer(), - groupId = "test-group-id", - autoOffsetReset = AutoOffsetReset.Earliest, - pollTimeout = consumerPollingTimeout - ) - fun publisherSettings( acknowledgments: Acks = Acks.One, - properties: Properties = Properties() + properties: Properties.() -> Unit ): PublisherSettings = - PublisherSettings( - bootstrapServers = container.bootstrapServers, - keySerializer = StringSerializer(), - valueSerializer = StringSerializer(), + publisherSettings.copy( acknowledgments = acknowledgments, - properties = properties.apply { - put(ProducerConfig.MAX_BLOCK_MS_CONFIG, 10000.toString()) - put(ProducerConfig.RETRY_BACKOFF_MS_CONFIG, 1000.toString()) + properties = Properties().apply { + properties() + put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, publisherSettings.bootstrapServers) + put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, publisherSettings.keySerializer::class.qualifiedName) + put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, publisherSettings.valueSerializer::class.qualifiedName) + put(ProducerConfig.ACKS_CONFIG, acknowledgments.value) } ) - val producer = KafkaProducer(publisherSettings().properties()) - val publisher = autoClose(KafkaPublisher(publisherSettings()) { producer }) + // private fun nextTopicName(): String = "topic-${UUID.randomUUID()}" - suspend fun withTopic( + class TopicTestScope( + val topic: NewTopic, + scope: CoroutineScope + ) : CoroutineScope by scope { + fun createProducerRecord(index: Int, partitions: Int = 4): ProducerRecord { + val partition: Int = index % partitions + return ProducerRecord(topic.name(), partition, "$index", "Message $index") + } + } + + fun withTopic( topicConfig: Map = emptyMap(), partitions: Int = 4, replicationFactor: Short = 1, - action: suspend (NewTopic) -> A, - ): A { + test: suspend TopicTestScope.(NewTopic) -> Unit + ): Unit = runBlocking { val topic = NewTopic(nextTopicName(), partitions, replicationFactor).configs(topicConfig) - return admin { + admin { createTopic(topic) try { - coroutineScope { - action(topic) - } + withTimeoutOrNull(40.seconds) { + TopicTestScope(topic, this).test(topic) + } ?: throw AssertionError("Timed out after 40 seconds...") } finally { topic.shouldBeEmpty() deleteTopic(topic.name()) @@ -146,6 +164,8 @@ abstract class KafkaSpec(body: KafkaSpec.() -> Unit = {}) : StringSpec() { } } + object boom : RuntimeException("Boom!") + @JvmName("publishPairsToKafka") suspend fun publishToKafka( topic: NewTopic, @@ -159,7 +179,9 @@ abstract class KafkaSpec(body: KafkaSpec.() -> Unit = {}) : StringSpec() { publisher.publishScope { offer(messages) } + // + // suspend fun KafkaReceiver.committedCount(topic: String): Long = admin { val description = requireNotNull(describeTopic(topic)) { "Topic $topic not found" } @@ -177,56 +199,67 @@ abstract class KafkaSpec(body: KafkaSpec.() -> Unit = {}) : StringSpec() { suspend fun NewTopic.shouldBeEmpty() { val res = withTimeoutOrNull(100) { - KafkaReceiver(receiverSetting()) + KafkaReceiver(receiverSetting) .receive(name()) .take(1) .toList() } - if (res != null) fail("Expected test to timeout, but found $res") + if (res != null) throw AssertionError("Expected test to timeout, but found $res") } - suspend infix fun NewTopic.shouldHaveRecord(records: ProducerRecord) { - assertSoftly { - KafkaReceiver(receiverSetting()) + suspend infix fun NewTopic.assertHasRecord(records: ProducerRecord) { + assertEquals( + KafkaReceiver(receiverSetting) .receive(name()) .map { it.apply { offset.acknowledge() } }.take(1) .map { it.value() } - .toList() shouldBe listOf(records.value()) - shouldBeEmpty() - } + .toList(), + listOf(records.value()) + ) + shouldBeEmpty() } - suspend infix fun NewTopic.shouldHaveRecords(records: Iterable>) { - KafkaReceiver(receiverSetting()) - .receive(name()) - .map { record -> - record - .also { record.offset.acknowledge() } - } - .take(records.toList().size) - .toList() - .groupBy({ it.partition() }) { it.value() } shouldBe records.groupBy({ it.partition() }) { it.value() } + suspend infix fun NewTopic.assertHasRecords(records: Iterable>) { + assertEquals( + KafkaReceiver(receiverSetting) + .receive(name()) + .map { record -> + record + .also { record.offset.acknowledge() } + } + .take(records.toList().size) + .toList() + .groupBy({ it.partition() }) { it.value() }, + records.groupBy({ it.partition() }) { it.value() } + ) + shouldBeEmpty() } @JvmName("shouldHaveAllRecords") - suspend infix fun NewTopic.shouldHaveRecords( + suspend infix fun NewTopic.assertHasRecords( records: Iterable>> ) { val expected = records.flatten().groupBy({ it.partition() }) { it.value() }.mapValues { it.value.toSet() } - KafkaReceiver(receiverSetting()) - .receive(name()) - .map { record -> - record.also { record.offset.acknowledge() } - } - .take(records.flatten().size) - .toList() - .groupBy({ it.partition() }) { it.value() } - .mapValues { it.value.toSet() } shouldBe expected + assertEquals( + KafkaReceiver(receiverSetting) + .receive(name()) + .map { record -> + record.also { record.offset.acknowledge() } + } + .take(records.flatten().size) + .toList() + .groupBy({ it.partition() }) { it.value() } + .mapValues { it.value.toSet() }, + expected + ) + shouldBeEmpty() } + // + // fun stubProducer(failOnNumber: Int? = null): suspend () -> Producer = suspend { object : Producer { override fun close() {} @@ -283,4 +316,5 @@ abstract class KafkaSpec(body: KafkaSpec.() -> Unit = {}) : StringSpec() { producer.send(record) } } + // } diff --git a/src/test/kotlin/io/github/nomisrev/kafka/KotestProject.kt b/src/test/kotlin/io/github/nomisrev/kafka/KotestProject.kt deleted file mode 100644 index cebab421..00000000 --- a/src/test/kotlin/io/github/nomisrev/kafka/KotestProject.kt +++ /dev/null @@ -1,13 +0,0 @@ -package io.github.nomisrev.kafka - -import io.kotest.core.config.AbstractProjectConfig -import io.kotest.property.PropertyTesting - -object KotestProject : AbstractProjectConfig() { - init { - System.setProperty("kotest.assertions.collection.print.size", "100") - PropertyTesting.defaultIterationCount = 10 - } - - override val invocationTimeout: Long = 40_000L -} \ No newline at end of file diff --git a/src/test/kotlin/io/github/nomisrev/kafka/Predef.kt b/src/test/kotlin/io/github/nomisrev/kafka/Predef.kt index 3c01de3d..f87fa528 100644 --- a/src/test/kotlin/io/github/nomisrev/kafka/Predef.kt +++ b/src/test/kotlin/io/github/nomisrev/kafka/Predef.kt @@ -1,11 +1,8 @@ package io.github.nomisrev.kafka -import io.kotest.assertions.fail -import kotlinx.coroutines.CancellationException -import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.flow.Flow import kotlinx.coroutines.flow.map +import kotlin.test.assertTrue inline fun Flow.mapIndexed( crossinline transform: suspend (index: Int, value: A) -> B, @@ -16,13 +13,17 @@ inline fun Flow.mapIndexed( } } -suspend fun CoroutineScope.shouldCancel(block: suspend () -> Unit): CancellationException { - val cancel = CompletableDeferred() - try { +inline fun assertThrows( + message: String? = "Expected exception ${A::class.java}, but code didn't throw any exception.", + block: () -> Unit, +): A { + val exception = try { block() - fail("Expected to be cancellable, but wasn't") - } catch (e: CancellationException) { - cancel.complete(e) + null + } catch (e: Throwable) { + e } - return cancel.await() -} \ No newline at end of file + ?: throw AssertionError(message) + assertTrue(exception is A, "Expected exception of ${A::class.java} but found ${exception.javaClass.name}") + return exception +} diff --git a/src/test/kotlin/io/github/nomisrev/kafka/ProducerSettingSpec.kt b/src/test/kotlin/io/github/nomisrev/kafka/ProducerSettingSpec.kt deleted file mode 100644 index 7a55a426..00000000 --- a/src/test/kotlin/io/github/nomisrev/kafka/ProducerSettingSpec.kt +++ /dev/null @@ -1,44 +0,0 @@ -package io.github.nomisrev.kafka - -import io.github.nomisRev.kafka.Acks -import io.github.nomisRev.kafka.ProducerSettings -import io.kotest.assertions.assertSoftly -import io.kotest.core.spec.style.StringSpec -import io.kotest.matchers.maps.shouldContainAll -import io.kotest.matchers.shouldBe -import io.kotest.property.Arb -import io.kotest.property.arbitrary.enum -import io.kotest.property.arbitrary.map -import io.kotest.property.arbitrary.string -import io.kotest.property.checkAll -import org.apache.kafka.clients.producer.ProducerConfig -import org.apache.kafka.common.serialization.StringSerializer -import java.util.Properties - -class ProducerSettingSpec : StringSpec({ - - "ProducerSettings Ack" { - checkAll( - Arb.string(), - Arb.enum(), - Arb.map(Arb.string(), Arb.string()) - ) { bootstrapServers, acks, map -> - val settings = ProducerSettings( - bootstrapServers, - StringSerializer(), - StringSerializer(), - acks = acks, - other = Properties().apply { - putAll(map) - } - ) - - assertSoftly(settings.properties()) { - @Suppress("UNCHECKED_CAST") - toMap().shouldContainAll(map as Map) - getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG) shouldBe bootstrapServers - getProperty(ProducerConfig.ACKS_CONFIG) shouldBe acks.value - } - } - } -}) \ No newline at end of file diff --git a/src/test/kotlin/io/github/nomisrev/kafka/publisher/KafkaPublisherSpec.kt b/src/test/kotlin/io/github/nomisrev/kafka/publisher/KafkaPublisherSpec.kt index d127bb6f..891b1217 100644 --- a/src/test/kotlin/io/github/nomisrev/kafka/publisher/KafkaPublisherSpec.kt +++ b/src/test/kotlin/io/github/nomisrev/kafka/publisher/KafkaPublisherSpec.kt @@ -3,308 +3,308 @@ package io.github.nomisrev.kafka.publisher import io.github.nomisRev.kafka.publisher.Acks import io.github.nomisRev.kafka.publisher.KafkaPublisher import io.github.nomisrev.kafka.KafkaSpec -import io.kotest.assertions.throwables.shouldThrow -import io.kotest.matchers.shouldBe +import io.github.nomisrev.kafka.assertThrows import kotlinx.coroutines.CancellationException import kotlinx.coroutines.CompletableDeferred import kotlinx.coroutines.CoroutineStart.UNDISPATCHED import kotlinx.coroutines.async import kotlinx.coroutines.awaitAll import kotlinx.coroutines.awaitCancellation +import kotlinx.coroutines.delay import kotlinx.coroutines.launch -import org.apache.kafka.clients.admin.NewTopic import org.apache.kafka.clients.producer.ProducerConfig import org.apache.kafka.clients.producer.ProducerRecord import org.apache.kafka.common.errors.ProducerFencedException -import java.util.Properties +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals -class KafkaPublisherSpec : KafkaSpec({ +class KafkaPublisherSpec : KafkaSpec() { - "All offered messages are received" { - withTopic { topic -> - val count = 3 - val records = (0..count).map { - topic.createProducerRecord(it) - } - publisher.publishScope { - offer(records) - } - - topic.shouldHaveRecords(records) + @Test + fun `All offered messages are received`() = withTopic { + val count = 3 + val records = (0..count).map { + createProducerRecord(it) + } + publisher.publishScope { + offer(records) } - } - "Can receive all messages that were published on the right partitions" { - withTopic { topic -> - val count = 3 - val records = (0..count).map { - topic.createProducerRecord(it) - } - publisher.publishScope { - publish(records) - } + topic.assertHasRecords(records) + } - topic.shouldHaveRecords(records) + @Test + fun `Can receive all messages that were published on the right partitions`() = withTopic { + val count = 3 + val records = (0..count).map { + createProducerRecord(it) + } + publisher.publishScope { + publish(records) } - } - "A failure in a produce block, rethrows the error" { - withTopic { topic -> - val record = topic.createProducerRecord(0) + topic.assertHasRecords(records) + } - shouldThrow { - publisher.publishScope { - offer(record) - throw boom - } - } shouldBe boom + @Test + fun `A failure in a produce block, rethrows the error`() = withTopic { + val record = createProducerRecord(0) - topic.shouldHaveRecord(record) + val exception = assertThrows { + publisher.publishScope { + offer(record) + throw boom + } } + + assertEquals(exception, boom) + topic.assertHasRecord(record) } - "A failure in a produce block with a concurrent launch cancels the launch, rethrows the error" { - withTopic { _ -> - val cancelSignal = CompletableDeferred() - shouldThrow { - publisher.publishScope { - launch(start = UNDISPATCHED) { - try { - awaitCancellation() - } catch (e: CancellationException) { - cancelSignal.complete(e) - throw e - } + @Test + fun `A failure in a produce block with a concurrent launch cancels the launch, rethrows the error`() = withTopic { + val cancelSignal = CompletableDeferred() + val exception = assertThrows { + publisher.publishScope { + launch(start = UNDISPATCHED) { + try { + awaitCancellation() + } catch (e: CancellationException) { + cancelSignal.complete(e) + throw e } - throw boom } - } shouldBe boom - cancelSignal.await() + throw boom + } } + + assertEquals(exception, boom) + cancelSignal.await() } - "A failed offer is rethrown at the end" { - withTopic { topic -> - val record = topic.createProducerRecord(0) + @Test + fun `A failed offer is rethrown at the end`() = withTopic { + val record = createProducerRecord(0) - shouldThrow { - KafkaPublisher(publisherSettings(), stubProducer(failOnNumber = 0)).publishScope { - offer(record) - } - } shouldBe boom + val exception = assertThrows { + KafkaPublisher(publisherSettings, stubProducer(failOnNumber = 0)).publishScope { + offer(record) + } } + + assertEquals(exception, boom) } - "An async failure is rethrow at the end" { - withTopic { topic -> - val count = 3 - val records: List> = (0..count).map { - topic.createProducerRecord(it) - } - shouldThrow { - publisher.publishScope { - publish(records) - launch { throw boom } - } - } shouldBe boom + @Test + fun `An async failure is rethrow at the end`() = withTopic { + val count = 3 + val records: List> = (0..count) + .map(::createProducerRecord) - topic.shouldHaveRecords(records) + val exception = assertThrows { + publisher.publishScope { + publish(records) + launch { throw boom } + } } + + assertEquals(exception, boom) + topic.assertHasRecords(records) } - "A failure of a sendAwait can be caught in the block" { - withTopic { topic -> - val record0 = topic.createProducerRecord(0) - val record1 = topic.createProducerRecord(1) + @Test + fun `A failure of a sendAwait can be caught in the block`() = withTopic { + val record0 = createProducerRecord(0) + val record1 = createProducerRecord(1) - KafkaPublisher(publisherSettings(), stubProducer(failOnNumber = 0)).use { - it.publishScope { - publishCatching(record0) - offer(record1) - } + KafkaPublisher(publisherSettings, stubProducer(failOnNumber = 0)).use { + it.publishScope { + publishCatching(record0) + offer(record1) } - - topic.shouldHaveRecord(record1) } - } - "concurrent publishing" { - withTopic { topic -> - val count = 4 - val records = - (0.. - (base..base + count).map { topic.createProducerRecord(it) } - } + topic.assertHasRecord(record1) + } - publisher.publishScope { - listOf( - async { offer(records[0]) }, - async { offer(records[1]) }, - async { publish(records[2]) }, - async { publish(records[3]) } - ).awaitAll() + @Test + fun `concurrent publishing`() = withTopic { + val count = 4 + val records = + (0.. + (base..base + count).map(::createProducerRecord) } - topic.shouldHaveRecords(records) + publisher.publishScope { + listOf( + async { offer(records[0]) }, + async { offer(records[1]) }, + async { publish(records[2]) }, + async { publish(records[3]) } + ).awaitAll() } + + topic.assertHasRecords(records) } - "transaction an receive all messages that were published on the right partitions" { - withTopic { topic -> - val count = 3 - val records = (0..count).map { - topic.createProducerRecord(it) - } - val settings = publisherSettings( - acknowledgments = Acks.All, - properties = Properties().apply { - put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, testCase.name.testName) - put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + @Test + fun `transaction an receive all messages that were published on the right partitions`() = withTopic { + val count = 3 + val records = (0..count).map(::createProducerRecord) + val settings = publisherSettings(Acks.All) { + put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction an receive all messages") + put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + } + + KafkaPublisher(settings).use { + it.publishScope { + transaction { + offer(records) } - ) + } + } + + topic.assertHasRecords(records) + } + + @Test + fun `A failure in a transaction aborts the transaction`() = withTopic { + val count = 3 + val records = (0..count).map(::createProducerRecord) + val settings = publisherSettings(Acks.All) { + put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "A failure in a transaction aborts") + put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + } + val exception = assertThrows { KafkaPublisher(settings).use { - it.publishScope { + it.publishScope { transaction { - offer(records) + publish(records) + throw boom } } } - - topic.shouldHaveRecords(records) } + + assertEquals(exception, boom) + topic.shouldBeEmpty() } - "A failure in a transaction aborts the transaction" { - withTopic { topic -> - val count = 3 - val records = (0..count).map { - topic.createProducerRecord(it) - } - val settings = publisherSettings( - acknowledgments = Acks.All, - properties = Properties().apply { - put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, testCase.name.testName) - put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") - } - ) - shouldThrow { - KafkaPublisher(settings).use { - it.publishScope { - transaction { - publish(records) - throw boom - } + @Test + fun `An async failure in a transaction aborts the transaction`() = withTopic { + val count = 3 + val records = (0..count).map(::createProducerRecord) + val settings = publisherSettings(Acks.All) { + put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "An async failure in a transaction aborts") + put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + } + val exception = assertThrows { + KafkaPublisher(settings).use { + it.publishScope { + transaction { + offer(records) + launch { throw boom } } } - } shouldBe boom - - topic.shouldBeEmpty() + } } + + assertEquals(exception, boom) + topic.shouldBeEmpty() } - "An async failure in a transaction aborts the transaction" { - withTopic { topic -> - val count = 3 - val records = (0..count).map { - topic.createProducerRecord(it) + @Test + fun `transaction - concurrent publishing`() = withTopic { + val count = 4 + val records = + (0.. + (base..base + count).map(::createProducerRecord) } - val settings = publisherSettings( - acknowledgments = Acks.All, - properties = Properties().apply { - put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, testCase.name.testName) - put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") - } - ) - shouldThrow { - KafkaPublisher(settings).use { - it.publishScope { - transaction { - offer(records) - launch { throw boom } - } - } - } - } shouldBe boom - topic.shouldBeEmpty() + val settings = publisherSettings(Acks.All) { + put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "transaction - concurrent publishing") + put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") } - } - "transaction - concurrent publishing" { - withTopic { topic -> - val count = 4 - val records = - (0.. - (base..base + count).map { topic.createProducerRecord(it) } + KafkaPublisher(settings).use { + it.publishScope { + transaction { + listOf( + async { offer(records[0]) }, + async { offer(records[1]) }, + async { publish(records[2]) }, + async { publish(records[3]) } + ).awaitAll() } + } + } - val settings = publisherSettings( - acknowledgments = Acks.All, - properties = Properties().apply { - put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, testCase.name.testName) - put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") - } - ) + topic.assertHasRecords(records) + } - KafkaPublisher(settings).use { - it.publishScope { - transaction { - listOf( - async { offer(records[0]) }, - async { offer(records[1]) }, - async { publish(records[2]) }, - async { publish(records[3]) } - ).awaitAll() - } - } + @Test + fun `Only one KafkaProducer can have transactional_id at once, ProducerFencedException is fatal`() = withTopic { + val settings = publisherSettings(Acks.All) { + put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, "Only one KafkaProducer can have transactional.id") + put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + } + val records1 = (0..4).map(::createProducerRecord) + val publisher1 = KafkaPublisher(settings) + publisher1.publishScope { + transaction { + publish(records1) } + } - topic.shouldHaveRecords(records) + val records2 = (5..9).map(::createProducerRecord) + val publisher2 = KafkaPublisher(settings) + publisher2.publishScope { + transaction { + publish(records2) + } } - } - "Only one KafkaProducer can have transactional.id at the same time, resulting ProducerFencedException is fatal" { - withTopic { topic -> - val settings = publisherSettings( - acknowledgments = Acks.All, - properties = Properties().apply { - put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, testCase.name.testName) - put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") - } - ) - val records1 = (0..4).map { topic.createProducerRecord(it) } - val publisher1 = KafkaPublisher(settings) + // publisher1 was previous transactional.id, will result in fatal ProducerFencedException + val records3 = (10..14).map(::createProducerRecord) + assertThrows { publisher1.publishScope { transaction { - publish(records1) + publishCatching(records3) } } + } - val records2 = (5..9).map { topic.createProducerRecord(it) } - val publisher2 = KafkaPublisher(settings) - publisher2.publishScope { - transaction { - publish(records2) - } - } + // Due to ProducerFencedException, only records1 and records2 are received + topic.assertHasRecords(records1 + records2) + } - // publisher1 was previous transactional.id, will result in fatal ProducerFencedException - val records3 = (10..14).map { topic.createProducerRecord(it) } - shouldThrow { - publisher1.publishScope { - transaction { - publishCatching(records3) + @Test + fun `idempotent publisher`() = withTopic { + val records = (0..10).map(::createProducerRecord) + launch(start = UNDISPATCHED) { + KafkaPublisher(publisherSettings { +// put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, "true") + put(ProducerConfig.REQUEST_TIMEOUT_MS_CONFIG, "20000") + }).use { + it.publishScope { + records.forEach { + offer(it) + delay(100) } } } - - // Due to ProducerFencedException, only records1 and records2 are received - topic.shouldHaveRecords(records1 + records2) } - } -}) -fun NewTopic.createProducerRecord(index: Int, partitions: Int = 4): ProducerRecord { - val partition: Int = index % partitions - return ProducerRecord(name(), partition, "$index", "Message $index") + println("Going to stop") + kafka.pause() + + delay(2000) + + println("Going to start") + kafka.unpause() + println("Started") + + topic.assertHasRecords(records) + println("asserted records") + } } diff --git a/src/test/kotlin/io/github/nomisrev/kafka/receiver/CommitStrategySpec.kt b/src/test/kotlin/io/github/nomisrev/kafka/receiver/CommitStrategySpec.kt index d807e7e9..01d6313e 100644 --- a/src/test/kotlin/io/github/nomisrev/kafka/receiver/CommitStrategySpec.kt +++ b/src/test/kotlin/io/github/nomisrev/kafka/receiver/CommitStrategySpec.kt @@ -1,53 +1,55 @@ package io.github.nomisrev.kafka.receiver import io.github.nomisRev.kafka.receiver.CommitStrategy -import io.kotest.assertions.throwables.shouldThrow -import io.kotest.core.spec.style.StringSpec -import io.kotest.matchers.shouldBe -import io.kotest.property.Arb -import io.kotest.property.arbitrary.int -import io.kotest.property.arbitrary.long -import io.kotest.property.arbitrary.map -import io.kotest.property.checkAll +import kotlinx.coroutines.runBlocking +import org.junit.jupiter.api.Test +import org.junit.jupiter.api.assertThrows import java.lang.IllegalArgumentException -import kotlin.time.Duration -import kotlin.time.Duration.Companion.nanoseconds +import kotlin.test.assertEquals import kotlin.time.Duration.Companion.seconds -class CommitStrategySpec : StringSpec({ - "Negative or zero sized BySize strategy fails" { - checkAll(Arb.int(max = 0)) { size -> - shouldThrow { - CommitStrategy.BySize(size) - }.message shouldBe "Size based auto-commit requires positive non-zero commit batch size but found $size" - } +class CommitStrategySpec { + @Test + fun `Negative or zero sized BySize strategy fails`() = runBlocking { + val actual = assertThrows { + CommitStrategy.BySize(0) + }.message + assertEquals( + "Size based auto-commit requires positive non-zero commit batch size but found 0", + actual + ) } - "Negative or zero sized BySizeOrTime strategy fails" { - checkAll(Arb.int(max = 0)) { size -> - shouldThrow { - CommitStrategy.BySizeOrTime(size, 1.seconds) - }.message shouldBe "Size based auto-commit requires positive non-zero commit batch size but found $size" - } + @Test + fun `Negative or zero sized BySizeOrTime strategy fails`() = runBlocking { + val actual = assertThrows { + CommitStrategy.BySizeOrTime(0, 1.seconds) + }.message + assertEquals( + "Size based auto-commit requires positive non-zero commit batch size but found 0", + actual + ) } - fun Arb.Companion.duration( - min: Long = Long.MIN_VALUE, max: Long = Long.MAX_VALUE, - ): Arb = Arb.long(min, max).map { it.nanoseconds } - - "Negative or zero duration BySizeOrTime strategy fails" { - checkAll(Arb.duration(max = 0)) { duration -> - shouldThrow { - CommitStrategy.BySizeOrTime(1, duration) - }.message shouldBe "Time based auto-commit requires positive non-zero interval but found $duration" - } + @Test + fun `Negative or zero duration BySizeOrTime strategy fails`() = runBlocking { + val actual = assertThrows { + CommitStrategy.BySizeOrTime(1, 0.seconds) + }.message + assertEquals( + "Time based auto-commit requires positive non-zero interval but found ${0.seconds}", + actual + ) } - "Negative or zero duration ByTime strategy fails" { - checkAll(Arb.duration(max = 0)) { duration -> - shouldThrow { - CommitStrategy.ByTime(duration) - }.message shouldBe "Time based auto-commit requires positive non-zero interval but found $duration" - } + @Test + fun `Negative or zero duration ByTime strategy fails`() = runBlocking { + val actual = assertThrows { + CommitStrategy.ByTime(0.seconds) + }.message + assertEquals( + "Time based auto-commit requires positive non-zero interval but found ${0.seconds}", + actual + ) } -}) +} diff --git a/src/test/kotlin/io/github/nomisrev/kafka/receiver/KafakReceiverSpec.kt b/src/test/kotlin/io/github/nomisrev/kafka/receiver/KafakReceiverSpec.kt index e9c971e8..dd7de460 100644 --- a/src/test/kotlin/io/github/nomisrev/kafka/receiver/KafakReceiverSpec.kt +++ b/src/test/kotlin/io/github/nomisrev/kafka/receiver/KafakReceiverSpec.kt @@ -3,12 +3,9 @@ package io.github.nomisrev.kafka.receiver import io.github.nomisRev.kafka.receiver.CommitStrategy import io.github.nomisRev.kafka.receiver.KafkaReceiver import io.github.nomisrev.kafka.KafkaSpec +import io.github.nomisrev.kafka.assertThrows import io.github.nomisrev.kafka.mapIndexed -import io.kotest.assertions.assertSoftly -import io.kotest.matchers.collections.shouldContainExactlyInAnyOrder -import io.kotest.matchers.shouldBe import kotlinx.coroutines.CompletableDeferred -import kotlinx.coroutines.ExperimentalCoroutinesApi import kotlinx.coroutines.cancelAndJoin import kotlinx.coroutines.flow.collect import kotlinx.coroutines.flow.collectIndexed @@ -17,13 +14,16 @@ import kotlinx.coroutines.flow.flattenMerge import kotlinx.coroutines.flow.flowOf import kotlinx.coroutines.flow.launchIn import kotlinx.coroutines.flow.map +import kotlinx.coroutines.flow.onEach import kotlinx.coroutines.flow.take import kotlinx.coroutines.flow.toList +import kotlinx.coroutines.flow.toSet import kotlinx.coroutines.yield import org.apache.kafka.clients.producer.ProducerRecord +import org.junit.jupiter.api.Test +import kotlin.test.assertEquals -@OptIn(ExperimentalCoroutinesApi::class) -class KafakReceiverSpec : KafkaSpec({ +class KafakReceiverSpec : KafkaSpec() { val count = 1000 val lastIndex = count - 1 @@ -35,191 +35,189 @@ class KafakReceiverSpec : KafkaSpec({ val produced = produced() - "All produced records are received" { - withTopic(partitions = 3) { topic -> - publishToKafka(topic, produced) - KafkaReceiver(receiverSetting()) + @Test + fun `All produced records are received`() = withTopic { + publishToKafka(topic, produced) + assertEquals( + KafkaReceiver(receiverSetting) .receive(topic.name()) .map { record -> yield() Pair(record.key(), record.value()) .also { record.offset.acknowledge() } }.take(count) - .toList() shouldContainExactlyInAnyOrder produced - } + .toSet(), + produced.toSet() + ) } - "All produced records with headers are received" { - withTopic(partitions = 1) { topic -> - val producerRecords = produced.map { (key, value) -> - ProducerRecord(topic.name(), key, value).apply { - headers().add("header1", byteArrayOf(0.toByte())) - headers().add("header2", value.toByteArray()) - } + @Test + fun `All produced records with headers are received`() = withTopic(partitions = 1) { + val producerRecords = produced.map { (key, value) -> + ProducerRecord(topic.name(), key, value).apply { + headers().add("header1", byteArrayOf(0.toByte())) + headers().add("header2", value.toByteArray()) } + } - publishToKafka(producerRecords) + publishToKafka(producerRecords) + + KafkaReceiver(receiverSetting) + .receive(topic.name()) + .take(count) + .onEach { it.offset.acknowledge() } + .toList().zip(producerRecords) { actual, expected -> + assertEquals(actual.key(), expected.key()) + assertEquals(actual.value(), expected.value()) + assertEquals(actual.topic(), topic.name()) + assertEquals(actual.headers().toArray().size, 2) + assertEquals(actual.headers(), expected.headers()) + } + } - KafkaReceiver(receiverSetting()) + @Test + fun `Should receive all records when subscribing several consumers`() = withTopic { + publishToKafka(topic, produced) + val consumer = + KafkaReceiver(receiverSetting) .receive(topic.name()) - .take(count) - .collectIndexed { index, received -> - assertSoftly(producerRecords[index]) { - received.key() shouldBe key() - received.value() shouldBe value() - received.topic() shouldBe topic.name() - received.headers().toArray().size shouldBe 2 - received.headers() shouldBe headers() - } - received.offset.acknowledge() + .map { + yield() + Pair(it.key(), it.value()) } - } - } - - "Should receive all records when subscribing several consumers" { - withTopic(partitions = 3) { topic -> - publishToKafka(topic, produced) - val consumer = - KafkaReceiver(receiverSetting()) - .receive(topic.name()) - .map { - yield() - Pair(it.key(), it.value()) - } + assertEquals( flowOf(consumer, consumer) .flattenMerge() .take(count) - .toList() shouldContainExactlyInAnyOrder produced - } + .toSet(), + produced.toSet() + ) } - "All acknowledged messages are committed on flow completion" { - withTopic(partitions = 3) { topic -> - publishToKafka(topic, produced) - val receiver = KafkaReceiver( - receiverSetting().copy( - commitStrategy = CommitStrategy.BySize(2 * count) - ) + @Test + fun `All acknowledged messages are committed on flow completion`() = withTopic { + publishToKafka(topic, produced) + val receiver = KafkaReceiver( + receiverSetting.copy( + commitStrategy = CommitStrategy.BySize(2 * count) ) + ) + receiver.receive(topic.name()) + .take(count) + .collectIndexed { index, value -> + if (index == lastIndex) { + value.offset.acknowledge() + assertEquals(receiver.committedCount(topic.name()), 0) + } else value.offset.acknowledge() + } + + assertEquals(receiver.committedCount(topic.name()), count.toLong()) + } + + @Test + fun `All acknowledged messages are committed on flow failure`() = withTopic { + publishToKafka(topic, produced) + val receiver = KafkaReceiver( + receiverSetting.copy( + commitStrategy = CommitStrategy.BySize(2 * count) + ) + ) + val exception = assertThrows { receiver.receive(topic.name()) - .take(count) .collectIndexed { index, value -> if (index == lastIndex) { value.offset.acknowledge() - receiver.committedCount(topic.name()) shouldBe 0 + assertEquals(receiver.committedCount(topic.name()), 0) + throw boom } else value.offset.acknowledge() } - - receiver.committedCount(topic.name()) shouldBe count } - } - "All acknowledged messages are committed on flow failure" { - withTopic(partitions = 3) { topic -> - publishToKafka(topic, produced) - val receiver = KafkaReceiver( - receiverSetting().copy( - commitStrategy = CommitStrategy.BySize(2 * count) - ) - ) - val failure = RuntimeException("Flow terminates") - runCatching { - receiver.receive(topic.name()) - .collectIndexed { index, value -> - if (index == lastIndex) { - value.offset.acknowledge() - receiver.committedCount(topic.name()) shouldBe 0 - throw failure - } else value.offset.acknowledge() - } - }.exceptionOrNull() shouldBe failure - - receiver.committedCount(topic.name()) shouldBe count - } + assertEquals(exception, boom) + assertEquals(receiver.committedCount(topic.name()), count.toLong()) } - "All acknowledged messages are committed on flow cancellation" { + @Test + fun `All acknowledged messages are committed on flow cancellation`() = withTopic { val scope = this - withTopic(partitions = 3) { topic -> - publishToKafka(topic, produced) - val receiver = KafkaReceiver( - receiverSetting().copy( - commitStrategy = CommitStrategy.BySize(2 * count) - ) + publishToKafka(topic, produced) + val receiver = KafkaReceiver( + receiverSetting.copy( + commitStrategy = CommitStrategy.BySize(2 * count) ) - val latch = CompletableDeferred() - val job = receiver.receive(topic.name()) - .mapIndexed { index, value -> - if (index == lastIndex) { - value.offset.acknowledge() - receiver.committedCount(topic.name()) shouldBe 0 - require(latch.complete(Unit)) { "Latch completed twice" } - } else value.offset.acknowledge() - }.launchIn(scope) - - latch.await() - job.cancelAndJoin() - - receiver.committedCount(topic.name()) shouldBe count - } + ) + val latch = CompletableDeferred() + val job = receiver.receive(topic.name()) + .mapIndexed { index, value -> + if (index == lastIndex) { + value.offset.acknowledge() + assertEquals(receiver.committedCount(topic.name()), 0) + require(latch.complete(Unit)) { "Latch completed twice" } + } else value.offset.acknowledge() + }.launchIn(scope) + + latch.await() + job.cancelAndJoin() + + assertEquals(receiver.committedCount(topic.name()), count.toLong()) } - "Manual commit also commits all acknowledged offsets" { - withTopic(partitions = 3) { topic -> - publishToKafka(topic, produced) - val receiver = KafkaReceiver( - receiverSetting().copy( - commitStrategy = CommitStrategy.BySize(2 * count) - ) + @Test + fun `Manual commit also commits all acknowledged offsets`() = withTopic { + publishToKafka(topic, produced) + val receiver = KafkaReceiver( + receiverSetting.copy( + commitStrategy = CommitStrategy.BySize(2 * count) ) - receiver.receive(topic.name()) - .take(count) - .collectIndexed { index, value -> - if (index == lastIndex) { - value.offset.commit() - receiver.committedCount(topic.name()) shouldBe count - } else value.offset.acknowledge() - } - } + ) + receiver.receive(topic.name()) + .take(count) + .collectIndexed { index, value -> + if (index == lastIndex) { + value.offset.commit() + assertEquals(receiver.committedCount(topic.name()), count.toLong()) + } else value.offset.acknowledge() + } } - "receiveAutoAck" { - withTopic(partitions = 3) { topic -> - publishToKafka(topic, produced) - val receiver = KafkaReceiver(receiverSetting()) + @Test + fun `receiveAutoAck`() = withTopic { + publishToKafka(topic, produced) + val receiver = KafkaReceiver(receiverSetting) - receiver.receiveAutoAck(topic.name()) - .flatMapConcat { it } - .take(count) - .collect() + receiver.receiveAutoAck(topic.name()) + .flatMapConcat { it } + .take(count) + .collect() - receiver.committedCount(topic.name()) shouldBe count - } + assertEquals(receiver.committedCount(topic.name()), count.toLong()) } - "receiveAutoAck does not receive same records" { - withTopic(partitions = 3) { topic -> - publishToKafka(topic, produced) - val receiver = KafkaReceiver(receiverSetting()) + @Test + fun `receiveAutoAck does not receive same records`() = withTopic { + publishToKafka(topic, produced) + val receiver = KafkaReceiver(receiverSetting) - receiver.receiveAutoAck(topic.name()) - .flatMapConcat { it } - .take(count) - .collect() + receiver.receiveAutoAck(topic.name()) + .flatMapConcat { it } + .take(count) + .collect() - receiver.committedCount(topic.name()) shouldBe count + assertEquals(receiver.committedCount(topic.name()), count.toLong()) - val seconds = produced(count + 1, count + 1 + count) - publishToKafka(topic, seconds) + val seconds = produced(count + 1, count + 1 + count) + publishToKafka(topic, seconds) + assertEquals( receiver.receiveAutoAck(topic.name()) .flatMapConcat { it } .map { Pair(it.key(), it.value()) } .take(count) - .toList() shouldContainExactlyInAnyOrder seconds + .toSet(), + seconds.toSet() + ) - receiver.committedCount(topic.name()) shouldBe (2 * count) - } + assertEquals(receiver.committedCount(topic.name()), (2L * count)) } -}) +}