From 925415992c3d4eaab29202c98c674b4c086a5524 Mon Sep 17 00:00:00 2001 From: Michael Gecht Date: Wed, 6 Nov 2024 21:22:34 +0000 Subject: [PATCH 1/5] Poll for messages using `TaskExecutor` We currently sleep for `pollInterval` when no new messages have been polled from the cluster. This leads to unnecessary slowness of the client. Instead of doing that, we now break up the polling of messages into two distinct approaches: 1. Attempt to poll synchronously: if there a message is polled, we return it. If there is no message, we immediately go to step 2. 2. We create a `DispatchQueue` and run the `consumerPoll` on it using `withTaskExecutorPreference`. We make the `consumerPoll` call wait for up to `pollInterval` before bailing. This prevents us from sleeping on the running thread, and frees up cycles to do other work if required. Resolves https://github.com/swift-server/swift-kafka-client/issues/165 --- Sources/Kafka/KafkaConsumer.swift | 21 ++++++++--- Sources/Kafka/RDKafka/RDKafkaClient.swift | 4 +-- .../Kafka/Utilities/NaiveQueueExecutor.swift | 36 +++++++++++++++++++ 3 files changed, 54 insertions(+), 7 deletions(-) create mode 100644 Sources/Kafka/Utilities/NaiveQueueExecutor.swift diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index f44b2444..f2181c1d 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +import Dispatch import Logging import NIOConcurrencyHelpers import NIOCore @@ -72,6 +73,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { public struct AsyncIterator: AsyncIteratorProtocol { private let stateMachineHolder: MachineHolder let pollInterval: Duration + let queue: NaiveQueueExecutor private final class MachineHolder: Sendable { // only for deinit let stateMachine: LockedMachine @@ -88,21 +90,30 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { init(stateMachine: LockedMachine, pollInterval: Duration) { self.stateMachineHolder = .init(stateMachine: stateMachine) self.pollInterval = pollInterval + self.queue = NaiveQueueExecutor(DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer")) } public func next() async throws -> Element? { - // swift-kafka-client issue: https://github.com/swift-server/swift-kafka-client/issues/165 - // Currently use Task.sleep() if no new messages, should use task executor preference when implemented: - // https://github.com/apple/swift-evolution/blob/main/proposals/0417-task-executor-preference.md while !Task.isCancelled { let action = self.stateMachineHolder.stateMachine.withLockedValue { $0.nextConsumerPollLoopAction() } switch action { case .poll(let client): - if let message = try client.consumerPoll() { // non-blocking call + // Attempt to fetch a message synchronously. Bail + // immediately if no message is waiting for us. + if let message = try client.consumerPoll() { return message } - try await Task.sleep(for: self.pollInterval) + + #if swift(>=6.0) + // Wait on a separate thread for the next message. + return try await withTaskExecutorPreference(queue) { + try client.consumerPoll(for: Int32(self.pollInterval.inMilliseconds)) + } + #else + // No messages. Sleep a little. + return try await Task.sleep(for: self.pollInterval) + #endif case .suspendPollLoop: try await Task.sleep(for: self.pollInterval) // not started yet case .terminatePollLoop: diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 0350576e..0987fe44 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -446,8 +446,8 @@ public final class RDKafkaClient: Sendable { /// /// - Returns: A ``KafkaConsumerMessage`` or `nil` if there are no new messages. /// - Throws: A ``KafkaError`` if the received message is an error message or malformed. - func consumerPoll() throws -> KafkaConsumerMessage? { - guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, 0) else { + func consumerPoll(for pollTimeoutMs: Int32 = 0) throws -> KafkaConsumerMessage? { + guard let messagePointer = rd_kafka_consumer_poll(self.kafkaHandle.pointer, pollTimeoutMs) else { // No error, there might be no more messages return nil } diff --git a/Sources/Kafka/Utilities/NaiveQueueExecutor.swift b/Sources/Kafka/Utilities/NaiveQueueExecutor.swift new file mode 100644 index 00000000..bff5f0fe --- /dev/null +++ b/Sources/Kafka/Utilities/NaiveQueueExecutor.swift @@ -0,0 +1,36 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2024 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Dispatch + +final class NaiveQueueExecutor: TaskExecutor { + let queue: DispatchQueue + + init(_ queue: DispatchQueue) { + self.queue = queue + } + + public func enqueue(_ _job: consuming ExecutorJob) { + let job = UnownedJob(_job) + queue.async { + job.runSynchronously( + on: self.asUnownedTaskExecutor()) + } + } + + @inlinable + public func asUnownedTaskExecutor() -> UnownedTaskExecutor { + UnownedTaskExecutor(ordinary: self) + } +} From 0d7b5790b8258f4a45a779402205516836ae0302 Mon Sep 17 00:00:00 2001 From: Michael Gecht Date: Thu, 14 Nov 2024 19:42:50 +0000 Subject: [PATCH 2/5] Remove unnecessary `return` --- Sources/Kafka/KafkaConsumer.swift | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index f2181c1d..52aa24c9 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -112,7 +112,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { } #else // No messages. Sleep a little. - return try await Task.sleep(for: self.pollInterval) + try await Task.sleep(for: self.pollInterval) #endif case .suspendPollLoop: try await Task.sleep(for: self.pollInterval) // not started yet From 46a04c74b5b2e3a19b2cfd8e4d2cdb50732a2512 Mon Sep 17 00:00:00 2001 From: Michael Gecht Date: Thu, 14 Nov 2024 19:45:10 +0000 Subject: [PATCH 3/5] Run `swift-format` Whoops. --- .../Kafka/Utilities/NaiveQueueExecutor.swift | 29 ++++++++++--------- 1 file changed, 15 insertions(+), 14 deletions(-) diff --git a/Sources/Kafka/Utilities/NaiveQueueExecutor.swift b/Sources/Kafka/Utilities/NaiveQueueExecutor.swift index bff5f0fe..d56fd6ab 100644 --- a/Sources/Kafka/Utilities/NaiveQueueExecutor.swift +++ b/Sources/Kafka/Utilities/NaiveQueueExecutor.swift @@ -15,22 +15,23 @@ import Dispatch final class NaiveQueueExecutor: TaskExecutor { - let queue: DispatchQueue + let queue: DispatchQueue - init(_ queue: DispatchQueue) { - self.queue = queue - } + init(_ queue: DispatchQueue) { + self.queue = queue + } - public func enqueue(_ _job: consuming ExecutorJob) { - let job = UnownedJob(_job) - queue.async { - job.runSynchronously( - on: self.asUnownedTaskExecutor()) + public func enqueue(_ _job: consuming ExecutorJob) { + let job = UnownedJob(_job) + queue.async { + job.runSynchronously( + on: self.asUnownedTaskExecutor() + ) + } } - } - @inlinable - public func asUnownedTaskExecutor() -> UnownedTaskExecutor { - UnownedTaskExecutor(ordinary: self) - } + @inlinable + public func asUnownedTaskExecutor() -> UnownedTaskExecutor { + UnownedTaskExecutor(ordinary: self) + } } From a84f1c08ec772992c4296e2fc53289b426e4a68b Mon Sep 17 00:00:00 2001 From: Michael Gecht Date: Thu, 14 Nov 2024 19:53:13 +0000 Subject: [PATCH 4/5] Use `@available` for `NaiveQueueExecutor` Can't use the current implementation before Swift 6. --- Sources/Kafka/KafkaConsumer.swift | 4 ++++ Sources/Kafka/Utilities/NaiveQueueExecutor.swift | 2 ++ 2 files changed, 6 insertions(+) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 52aa24c9..0b7be6b8 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -73,7 +73,9 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { public struct AsyncIterator: AsyncIteratorProtocol { private let stateMachineHolder: MachineHolder let pollInterval: Duration + #if swift(>=6.0) let queue: NaiveQueueExecutor + #endif private final class MachineHolder: Sendable { // only for deinit let stateMachine: LockedMachine @@ -90,7 +92,9 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { init(stateMachine: LockedMachine, pollInterval: Duration) { self.stateMachineHolder = .init(stateMachine: stateMachine) self.pollInterval = pollInterval + #if swift(>=6.0) self.queue = NaiveQueueExecutor(DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer")) + #endif } public func next() async throws -> Element? { diff --git a/Sources/Kafka/Utilities/NaiveQueueExecutor.swift b/Sources/Kafka/Utilities/NaiveQueueExecutor.swift index d56fd6ab..991e1008 100644 --- a/Sources/Kafka/Utilities/NaiveQueueExecutor.swift +++ b/Sources/Kafka/Utilities/NaiveQueueExecutor.swift @@ -12,6 +12,7 @@ // //===----------------------------------------------------------------------===// +#if swift(>=6.0) import Dispatch final class NaiveQueueExecutor: TaskExecutor { @@ -35,3 +36,4 @@ final class NaiveQueueExecutor: TaskExecutor { UnownedTaskExecutor(ordinary: self) } } +#endif From 556562402ede1bbcda649feeb3243e52ab620f4c Mon Sep 17 00:00:00 2001 From: Michael Gecht Date: Fri, 15 Nov 2024 12:27:05 +0000 Subject: [PATCH 5/5] Address reviewer comments --- Sources/Kafka/KafkaConsumer.swift | 8 +++++--- ...ueueExecutor.swift => DispatchQueueTaskExecutor.swift} | 2 +- 2 files changed, 6 insertions(+), 4 deletions(-) rename Sources/Kafka/Utilities/{NaiveQueueExecutor.swift => DispatchQueueTaskExecutor.swift} (94%) diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index 0b7be6b8..bd3f5542 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -12,7 +12,6 @@ // //===----------------------------------------------------------------------===// -import Dispatch import Logging import NIOConcurrencyHelpers import NIOCore @@ -74,7 +73,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { private let stateMachineHolder: MachineHolder let pollInterval: Duration #if swift(>=6.0) - let queue: NaiveQueueExecutor + private let queue: DispatchQueueTaskExecutor #endif private final class MachineHolder: Sendable { // only for deinit @@ -93,7 +92,9 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { self.stateMachineHolder = .init(stateMachine: stateMachine) self.pollInterval = pollInterval #if swift(>=6.0) - self.queue = NaiveQueueExecutor(DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer")) + self.queue = DispatchQueueTaskExecutor( + DispatchQueue(label: "com.swift-server.swift-kafka.message-consumer") + ) #endif } @@ -111,6 +112,7 @@ public struct KafkaConsumerMessages: Sendable, AsyncSequence { #if swift(>=6.0) // Wait on a separate thread for the next message. + // The call below will block for `pollInterval`. return try await withTaskExecutorPreference(queue) { try client.consumerPoll(for: Int32(self.pollInterval.inMilliseconds)) } diff --git a/Sources/Kafka/Utilities/NaiveQueueExecutor.swift b/Sources/Kafka/Utilities/DispatchQueueTaskExecutor.swift similarity index 94% rename from Sources/Kafka/Utilities/NaiveQueueExecutor.swift rename to Sources/Kafka/Utilities/DispatchQueueTaskExecutor.swift index 991e1008..2a2051b4 100644 --- a/Sources/Kafka/Utilities/NaiveQueueExecutor.swift +++ b/Sources/Kafka/Utilities/DispatchQueueTaskExecutor.swift @@ -15,7 +15,7 @@ #if swift(>=6.0) import Dispatch -final class NaiveQueueExecutor: TaskExecutor { +final class DispatchQueueTaskExecutor: TaskExecutor { let queue: DispatchQueue init(_ queue: DispatchQueue) {