Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,15 @@ package io.github.nomisRev.kafka.receiver.internals
import io.github.nomisRev.kafka.receiver.Offset
import io.github.nomisRev.kafka.receiver.ReceiverSettings
import io.github.nomisRev.kafka.receiver.size
import java.time.Duration
import java.time.Duration.ofSeconds
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.time.toJavaDuration
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.Dispatchers
Expand All @@ -28,15 +37,6 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.errors.WakeupException
import org.slf4j.Logger
import org.slf4j.LoggerFactory
import java.time.Duration
import java.time.Duration.ofSeconds
import java.util.concurrent.atomic.AtomicBoolean
import java.util.concurrent.atomic.AtomicInteger
import kotlin.coroutines.Continuation
import kotlin.coroutines.resume
import kotlin.coroutines.resumeWithException
import kotlin.coroutines.suspendCoroutine
import kotlin.time.toJavaDuration

internal class PollLoop<K, V>(
// TODO also allow for Pattern, and assign
Expand Down Expand Up @@ -117,8 +117,11 @@ internal class CommittableOffset<K, V>(
private val acknowledged = AtomicBoolean(false)

override suspend fun commit(): Unit =
if (maybeUpdateOffset() > 0) scheduleCommit() else Unit

if (maybeUpdateOffset() > 0) suspendCoroutine { cont ->
loop.commitBatch.addContinuation(cont)
loop.scheduleCommitIfRequired()
} else Unit

override suspend fun acknowledge() {
val uncommittedCount = maybeUpdateOffset().toLong()
if (commitBatchSize in 1..uncommittedCount) {
Expand All @@ -129,13 +132,7 @@ internal class CommittableOffset<K, V>(
private /*suspend*/ fun maybeUpdateOffset(): Int =
if (acknowledged.compareAndSet(false, true)) loop.commitBatch.updateOffset(topicPartition, offset)
else loop.commitBatch.batchSize()

private suspend fun scheduleCommit(): Unit =
suspendCoroutine { cont ->
loop.commitBatch.addContinuation(cont)
loop.scheduleCommitIfRequired()
}


override fun toString(): String = "$topicPartition@$offset"
}

Expand Down Expand Up @@ -175,40 +172,51 @@ internal class EventLoop<K, V>(
consumer.subscribe(topicNames, object : ConsumerRebalanceListener {
override fun onPartitionsAssigned(partitions: MutableCollection<TopicPartition>) {
logger.debug("onPartitionsAssigned $partitions")
// onAssign methods may perform seek. It is safe to use the consumer here since we are in a poll()
if (partitions.isNotEmpty()) {
if (pausedByUs.get()) {
logger.debug("Rebalance during back pressure, re-pausing new assignments")
consumer.pause(partitions)
var repausedAll = false
if (partitions.isNotEmpty() && pausedByUs.get()) {
logger.debug("Rebalance during back pressure, re-pausing new assignments")
consumer.pause(partitions)
repausedAll = true
}
if (pausedByUser.isNotEmpty()) {
val toRepause = buildList {
//It is necessary to re-pause any user-paused partitions that are re-assigned after the rebalance.
//Also remove any revoked partitions that the user paused from the userPaused collection.
pausedByUser.forEach { tp ->
if (partitions.contains(tp)) add(tp)
else pausedByUser.remove(tp)
}
}
// TODO Setup user listeners
// for (onAssign in receiverOptions.assignListeners()) {
// onAssign.accept(toSeekable(partitions))
// }
if (logger.isTraceEnabled) {
try {
val positions = partitions.map { part: TopicPartition ->
"$part pos: ${consumer.position(part, ofSeconds(5))}"
}
logger.trace(
"positions: $positions, committed: ${
consumer.committed(
partitions.toSet(),
ofSeconds(5)
)
}"
)
} catch (ex: Exception) {
logger.error("Failed to get positions or committed", ex)
if (!repausedAll && toRepause.isNotEmpty()) {
consumer.pause(toRepause)
}
}
// TODO Setup user listeners
// for (onAssign in receiverOptions.assignListeners()) {
// onAssign.accept(toSeekable(partitions))
// }
if (logger.isTraceEnabled) {
try {
val positions = partitions.map { part: TopicPartition ->
"$part pos: ${consumer.position(part, ofSeconds(5))}"
}
logger.trace(
"positions: $positions, committed: ${
consumer.committed(
partitions.toSet(), ofSeconds(5)
)
}"
)
} catch (ex: Exception) {
logger.error("Failed to get positions or committed", ex)
}
}
}

override fun onPartitionsRevoked(partitions: MutableCollection<TopicPartition>) {
logger.debug("onPartitionsRevoked $partitions")
commitBatch.onPartitionsRevoked(partitions)
[email protected](partitions)
commitBatch.onPartitionsRevoked(partitions)
}
})
} catch (e: Throwable) {
Expand Down Expand Up @@ -252,9 +260,11 @@ internal class EventLoop<K, V>(
toResume.removeAll(pausedByUser)
pausedByUser.clear()
consumer.resume(toResume)
logger.debug("Resumed")
if (logger.isDebugEnabled) {
logger.debug("Resumed partitions: $toResume")
}
} else {
}
} else {
if (checkAndSetPausedByUs()) {
pausedByUser.addAll(consumer.paused())
consumer.pause(consumer.assignment())
Expand Down Expand Up @@ -315,13 +325,13 @@ internal class EventLoop<K, V>(
channel.close(e)
}
} else null

val commitBatch: CommittableBatch = CommittableBatch()
private val isPending = AtomicBoolean()
private val inProgress = AtomicInteger()
private val consecutiveCommitFailures = AtomicInteger()
private val retrying = AtomicBoolean()

// TODO Should reset delay of commitJob
private fun commit() {
if (!isPending.compareAndSet(true, false)) return
Expand Down Expand Up @@ -371,10 +381,10 @@ internal class EventLoop<K, V>(
cont.resume(Unit)
}
}

private fun pollTaskAfterRetry(): Job? =
if (retrying.getAndSet(false)) schedulePoll() else null

private fun commitFailure(commitArgs: CommittableBatch.CommitArgs, exception: Exception) {
logger.warn("Commit failed", exception)
if (!isRetriableException(exception) && consecutiveCommitFailures.incrementAndGet() < settings.maxCommitAttempts) {
Expand Down
4 changes: 1 addition & 3 deletions src/test/kotlin/io/github/nomisrev/kafka/KafkaSpec.kt
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,8 @@ abstract class KafkaSpec(body: KafkaSpec.() -> Unit = {}) : StringSpec() {
private val transactionTimeoutInterval = 1.seconds
private val consumerPollingTimeout = 1.seconds

private val postfix = if (System.getProperty("os.arch") == "aarch64") ".arm64" else ""
private val imageVersion = "latest$postfix"
private val kafkaImage: DockerImageName =
DockerImageName.parse("confluentinc/cp-kafka:$imageVersion")
DockerImageName.parse("confluentinc/cp-kafka:latest")

private val container: KafkaContainer = autoClose(
KafkaContainer(kafkaImage)
Expand Down
2 changes: 2 additions & 0 deletions src/test/kotlin/io/github/nomisrev/kafka/KotestProject.kt
Original file line number Diff line number Diff line change
Expand Up @@ -8,4 +8,6 @@ object KotestProject : AbstractProjectConfig() {
System.setProperty("kotest.assertions.collection.print.size", "100")
PropertyTesting.defaultIterationCount = 10
}

override val invocationTimeout: Long = 40_000L
}