1
1
package io.github.nomisRev.kafka.receiver.internals
2
2
3
- import io.github.nomisRev.kafka.receiver.CommitStrategy
4
3
import io.github.nomisRev.kafka.receiver.Offset
5
4
import io.github.nomisRev.kafka.receiver.ReceiverSettings
6
5
import io.github.nomisRev.kafka.receiver.size
@@ -37,7 +36,6 @@ import kotlin.coroutines.Continuation
37
36
import kotlin.coroutines.resume
38
37
import kotlin.coroutines.resumeWithException
39
38
import kotlin.coroutines.suspendCoroutine
40
- import kotlin.time.Duration.Companion.seconds
41
39
import kotlin.time.toJavaDuration
42
40
43
41
internal class PollLoop <K , V >(
@@ -48,7 +46,6 @@ internal class PollLoop<K, V>(
48
46
scope : CoroutineScope ,
49
47
awaitingTransaction : AtomicBoolean = AtomicBoolean (false),
50
48
private val isActive : AtomicBoolean = AtomicBoolean (true),
51
- private val commitStrategy : CommitStrategy = CommitStrategy .BySizeOrTime (5, 5.seconds),
52
49
private val ackMode : AckMode = AckMode .MANUAL_ACK ,
53
50
isRetriableException : (Throwable ) -> Boolean = { e -> e is RetriableCommitFailedException },
54
51
) {
@@ -78,7 +75,7 @@ internal class PollLoop<K, V>(
78
75
) {
79
76
offsetCommitWorker(
80
77
ackMode,
81
- commitStrategy,
78
+ settings. commitStrategy,
82
79
reachedMaxCommitBatchSize,
83
80
loop::scheduleCommitIfRequired
84
81
)
@@ -105,7 +102,7 @@ internal class PollLoop<K, V>(
105
102
TopicPartition (record.topic(), record.partition()),
106
103
record.offset(),
107
104
loop,
108
- commitStrategy.size(),
105
+ settings. commitStrategy.size(),
109
106
reachedMaxCommitBatchSize
110
107
)
111
108
}
0 commit comments