diff --git a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift index d725cf4a..ec0bff9a 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/KafkaConsumerBenchmark.swift @@ -13,18 +13,333 @@ //===----------------------------------------------------------------------===// import Benchmark +import Crdkafka +import Dispatch +import struct Foundation.Date +import struct Foundation.UUID import Kafka +import Logging +import ServiceLifecycle let benchmarks = { + var uniqueTestTopic: String! + let messageCount: UInt = 1000 + Benchmark.defaultConfiguration = .init( - metrics: [.wallClock, .cpuTotal, .allocatedResidentMemory, .contextSwitches, .throughput] + .arc, + metrics: [ + .wallClock, + .cpuTotal, + .contextSwitches, + .throughput, + .allocatedResidentMemory, + ] + .arc, warmupIterations: 0, scalingFactor: .one, maxDuration: .seconds(5), - maxIterations: 100 + maxIterations: 100, + thresholds: [ + .wallClock: .init(relative: [.p90: 35]), + .cpuTotal: .init(relative: [.p90: 35]), + .allocatedResidentMemory: .init(relative: [.p90: 20]), + .contextSwitches: .init(relative: [.p90: 35]), + .throughput: .init(relative: [.p90: 35]), + .objectAllocCount: .init(relative: [.p90: 20]), + .retainCount: .init(relative: [.p90: 20]), + .releaseCount: .init(relative: [.p90: 20]), + .retainReleaseDelta: .init(relative: [.p90: 20]), + ] ) - Benchmark.setup = {} + Benchmark.setup = { + uniqueTestTopic = try await prepareTopic(messagesCount: messageCount, partitions: 6) + } + + Benchmark.teardown = { + if let uniqueTestTopic { + try deleteTopic(uniqueTestTopic) + } + uniqueTestTopic = nil + } + + Benchmark("SwiftKafkaConsumer_basic_consumer_messages_\(messageCount)") { benchmark in + let uniqueGroupID = UUID().uuidString + var consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group( + id: uniqueGroupID, + topics: [uniqueTestTopic] + ), + bootstrapBrokerAddresses: [brokerAddress] + ) + consumerConfig.autoOffsetReset = .beginning + consumerConfig.broker.addressFamily = .v4 + // We must specify it at least 10 otherwise CI will timeout + consumerConfig.pollInterval = .milliseconds(1) + + let consumer = try KafkaConsumer( + configuration: consumerConfig, + logger: .perfLogger + ) + + let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .perfLogger) + let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) + + try await withThrowingTaskGroup(of: Void.self) { group in + benchLog("Start consuming") + defer { + benchLog("Finish consuming") + } + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // Second Consumer Task + group.addTask { + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 + + try await benchmark.withMeasurement { + for try await record in consumer.messages { + ctr += 1 + totalBytes += UInt64(record.value.readableBytes) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(messageCount))%") + tmpCtr = 0 + } + if ctr >= messageCount { + break + } + } + } + + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + } + + // Wait for second Consumer Task to complete + try await group.next() + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + } + + Benchmark("SwiftKafkaConsumer_with_offset_commit_messages_\(messageCount)") { benchmark in + let uniqueGroupID = UUID().uuidString + var consumerConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group( + id: uniqueGroupID, + topics: [uniqueTestTopic] + ), + bootstrapBrokerAddresses: [brokerAddress] + ) + consumerConfig.autoOffsetReset = .beginning + consumerConfig.broker.addressFamily = .v4 + consumerConfig.isAutoCommitEnabled = false + // We must specify it at least 10 otherwise CI will timeout + consumerConfig.pollInterval = .milliseconds(1) + + let consumer = try KafkaConsumer( + configuration: consumerConfig, + logger: .perfLogger + ) + + let serviceGroupConfiguration = ServiceGroupConfiguration(services: [consumer], gracefulShutdownSignals: [.sigterm, .sigint], logger: .perfLogger) + let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) + + try await withThrowingTaskGroup(of: Void.self) { group in + benchLog("Start consuming") + defer { + benchLog("Finish consuming") + } + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // Second Consumer Task + group.addTask { + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 + + try await benchmark.withMeasurement { + for try await record in consumer.messages { + try consumer.scheduleCommit(record) + + ctr += 1 + totalBytes += UInt64(record.value.readableBytes) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(messageCount))%") + tmpCtr = 0 + } + if ctr >= messageCount { + break + } + } + } + + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + } + + // Wait for second Consumer Task to complete + try await group.next() + // Shutdown the serviceGroup + await serviceGroup.triggerGracefulShutdown() + } + } + + Benchmark("librdkafka_basic_consumer_messages_\(messageCount)") { benchmark in + let uniqueGroupID = UUID().uuidString + let rdKafkaConsumerConfig: [String: String] = [ + "group.id": uniqueGroupID, + "bootstrap.servers": "\(brokerAddress.host):\(brokerAddress.port)", + "broker.address.family": "v4", + "auto.offset.reset": "beginning", + ] + + let configPointer: OpaquePointer = rd_kafka_conf_new() + for (key, value) in rdKafkaConsumerConfig { + precondition(rd_kafka_conf_set(configPointer, key, value, nil, 0) == RD_KAFKA_CONF_OK) + } + + let kafkaHandle = rd_kafka_new(RD_KAFKA_CONSUMER, configPointer, nil, 0) + guard let kafkaHandle else { + preconditionFailure("Kafka handle was not created") + } + defer { + rd_kafka_destroy(kafkaHandle) + } + + rd_kafka_poll_set_consumer(kafkaHandle) + let subscriptionList = rd_kafka_topic_partition_list_new(1) + defer { + rd_kafka_topic_partition_list_destroy(subscriptionList) + } + rd_kafka_topic_partition_list_add( + subscriptionList, + uniqueTestTopic, + RD_KAFKA_PARTITION_UA + ) + rd_kafka_subscribe(kafkaHandle, subscriptionList) + rd_kafka_poll(kafkaHandle, 0) + + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + + let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 + + benchmark.withMeasurement { + while ctr < messageCount { + guard let record = rd_kafka_consumer_poll(kafkaHandle, 10) else { + continue + } + defer { + rd_kafka_message_destroy(record) + } + ctr += 1 + totalBytes += UInt64(record.pointee.len) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(messageCount))%") + tmpCtr = 0 + } + } + } + + rd_kafka_consumer_close(kafkaHandle) + + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + } + + Benchmark("librdkafka_with_offset_commit_messages_\(messageCount)") { benchmark in + let uniqueGroupID = UUID().uuidString + let rdKafkaConsumerConfig: [String: String] = [ + "group.id": uniqueGroupID, + "bootstrap.servers": "\(brokerAddress.host):\(brokerAddress.port)", + "broker.address.family": "v4", + "auto.offset.reset": "beginning", + "enable.auto.commit": "false", + ] + + let configPointer: OpaquePointer = rd_kafka_conf_new() + for (key, value) in rdKafkaConsumerConfig { + precondition(rd_kafka_conf_set(configPointer, key, value, nil, 0) == RD_KAFKA_CONF_OK) + } + + let kafkaHandle = rd_kafka_new(RD_KAFKA_CONSUMER, configPointer, nil, 0) + guard let kafkaHandle else { + preconditionFailure("Kafka handle was not created") + } + defer { + rd_kafka_destroy(kafkaHandle) + } + + rd_kafka_poll_set_consumer(kafkaHandle) + let subscriptionList = rd_kafka_topic_partition_list_new(1) + defer { + rd_kafka_topic_partition_list_destroy(subscriptionList) + } + rd_kafka_topic_partition_list_add( + subscriptionList, + uniqueTestTopic, + RD_KAFKA_PARTITION_UA + ) + rd_kafka_subscribe(kafkaHandle, subscriptionList) + rd_kafka_poll(kafkaHandle, 0) + + var ctr: UInt64 = 0 + var tmpCtr: UInt64 = 0 + + let interval: UInt64 = Swift.max(UInt64(messageCount / 20), 1) + let totalStartDate = Date.timeIntervalSinceReferenceDate + var totalBytes: UInt64 = 0 + + benchmark.withMeasurement { + while ctr < messageCount { + guard let record = rd_kafka_consumer_poll(kafkaHandle, 10) else { + continue + } + defer { + rd_kafka_message_destroy(record) + } + guard record.pointee.err != RD_KAFKA_RESP_ERR__PARTITION_EOF else { + continue + } + let result = rd_kafka_commit_message(kafkaHandle, record, 0) + precondition(result == RD_KAFKA_RESP_ERR_NO_ERROR) + + ctr += 1 + totalBytes += UInt64(record.pointee.len) + + tmpCtr += 1 + if tmpCtr >= interval { + benchLog("read \(ctr * 100 / UInt64(messageCount))%") + tmpCtr = 0 + } + } + } + + rd_kafka_consumer_close(kafkaHandle) - Benchmark.teardown = {} + let timeIntervalTotal = Date.timeIntervalSinceReferenceDate - totalStartDate + let avgRateMb = Double(totalBytes) / timeIntervalTotal / 1024 + benchLog("All read up to ctr: \(ctr), avgRate: (\(Int(avgRateMb))KB/s), timePassed: \(Int(timeIntervalTotal))sec") + } } diff --git a/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/Utilities.swift b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/Utilities.swift new file mode 100644 index 00000000..304dc1fb --- /dev/null +++ b/Benchmarks/Benchmarks/SwiftKafkaConsumerBenchmarks/Utilities.swift @@ -0,0 +1,128 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Benchmark +import class Foundation.ProcessInfo +import struct Foundation.UUID +import Kafka +@_spi(Internal) import Kafka +import Logging +import ServiceLifecycle + +let brokerAddress = KafkaConfiguration.BrokerAddress( + host: ProcessInfo.processInfo.environment["KAFKA_HOST"] ?? "localhost", + port: 9092 +) + +extension Logger { + static let perfLogger = { + var logger = Logger(label: "perf logger") + logger.logLevel = .critical + return logger + }() +} + +// For perf tests debugging +func benchLog(_ log: @autoclosure () -> Logger.Message) { + #if DEBUG + Logger.perfLogger.info(log()) + #endif +} + +func createTopic(partitions: Int32) throws -> String { + var basicConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: "no-group", topics: []), + bootstrapBrokerAddresses: [brokerAddress] + ) + basicConfig.broker.addressFamily = .v4 + + let client = try RDKafkaClient.makeClientForTopics(config: basicConfig, logger: .perfLogger) + return try client._createUniqueTopic(partitions: partitions, timeout: 10 * 1000) +} + +func deleteTopic(_ topic: String) throws { + var basicConfig = KafkaConsumerConfiguration( + consumptionStrategy: .group(id: "no-group", topics: []), + bootstrapBrokerAddresses: [brokerAddress] + ) + basicConfig.broker.addressFamily = .v4 + + let client = try RDKafkaClient.makeClientForTopics(config: basicConfig, logger: .perfLogger) + try client._deleteTopic(topic, timeout: 10 * 1000) +} + +func prepareTopic(messagesCount: UInt, partitions: Int32 = -1, logger: Logger = .perfLogger) async throws -> String { + let uniqueTestTopic = try createTopic(partitions: partitions) + + benchLog("Created topic \(uniqueTestTopic)") + + benchLog("Generating \(messagesCount) messages") + let testMessages = _createTestMessages(topic: uniqueTestTopic, count: messagesCount) + benchLog("Finish generating \(messagesCount) messages") + + var producerConfig = KafkaProducerConfiguration(bootstrapBrokerAddresses: [brokerAddress]) + producerConfig.broker.addressFamily = .v4 + + let (producer, acks) = try KafkaProducer.makeProducerWithEvents(configuration: producerConfig, logger: logger) + + let serviceGroupConfiguration = ServiceGroupConfiguration(services: [producer], gracefulShutdownSignals: [.sigterm, .sigint], logger: logger) + let serviceGroup = ServiceGroup(configuration: serviceGroupConfiguration) + + try await withThrowingTaskGroup(of: Void.self) { group in + benchLog("Start producing \(messagesCount) messages") + defer { + benchLog("Finish producing") + } + // Run Task + group.addTask { + try await serviceGroup.run() + } + + // Producer Task + group.addTask { + try await _sendAndAcknowledgeMessages( + producer: producer, + events: acks, + messages: testMessages, + skipConsistencyCheck: true + ) + } + + // Wait for Producer Task to complete + try await group.next() + await serviceGroup.triggerGracefulShutdown() + } + + return uniqueTestTopic +} + +extension Benchmark { + @discardableResult + func withMeasurement(_ body: () throws -> T) rethrows -> T { + self.startMeasurement() + defer { + self.stopMeasurement() + } + return try body() + } + + @discardableResult + func withMeasurement(_ body: () async throws -> T) async rethrows -> T { + self.startMeasurement() + defer { + self.stopMeasurement() + } + return try await body() + } +} diff --git a/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift b/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift index 87f0a50b..1971d9e0 100644 --- a/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift +++ b/Benchmarks/Benchmarks/SwiftKafkaProducerBenchmarks/KafkaProducerBenchmark.swift @@ -22,7 +22,19 @@ let benchmarks = { warmupIterations: 0, scalingFactor: .one, maxDuration: .seconds(5), - maxIterations: 100 + maxIterations: 100, + thresholds: [ + // Thresholds are wild guess mostly. Have to adjust with time. + .wallClock: .init(relative: [.p90: 10]), + .cpuTotal: .init(relative: [.p90: 10]), + .allocatedResidentMemory: .init(relative: [.p90: 20]), + .contextSwitches: .init(relative: [.p90: 10]), + .throughput: .init(relative: [.p90: 10]), + .objectAllocCount: .init(relative: [.p90: 10]), + .retainCount: .init(relative: [.p90: 10]), + .releaseCount: .init(relative: [.p90: 10]), + .retainReleaseDelta: .init(relative: [.p90: 10]), + ] ) Benchmark.setup = {} diff --git a/Benchmarks/Package.swift b/Benchmarks/Package.swift index e27d3106..4ea81f8c 100644 --- a/Benchmarks/Package.swift +++ b/Benchmarks/Package.swift @@ -22,7 +22,7 @@ let package = Package( ], dependencies: [ .package(path: "../"), - .package(url: "https://github.com/ordo-one/package-benchmark.git", from: "1.11.1"), + .package(url: "https://github.com/ordo-one/package-benchmark.git", from: "1.22.3"), ], targets: [ .executableTarget( diff --git a/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift b/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift new file mode 100644 index 00000000..58f2453d --- /dev/null +++ b/Sources/Kafka/ForTesting/RDKafkaClient+Topic.swift @@ -0,0 +1,157 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// + +import Crdkafka +import struct Foundation.UUID +import Logging + +@_spi(Internal) +extension RDKafkaClient { + /// Create a topic with a unique name (`UUID`). + /// Blocks for a maximum of `timeout` milliseconds. + /// - Parameter partitions: Partitions in topic (default: -1 - default for broker) + /// - Parameter timeout: Timeout in milliseconds. + /// - Returns: Name of newly created topic. + /// - Throws: A ``KafkaError`` if the topic creation failed. + public func _createUniqueTopic(partitions: Int32 = -1, timeout: Int32) throws -> String { + let uniqueTopicName = UUID().uuidString + + let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) + defer { errorChars.deallocate() } + + guard let newTopic = rd_kafka_NewTopic_new( + uniqueTopicName, + partitions, + -1, // use default replication_factor + errorChars, + RDKafkaClient.stringSize + ) else { + let errorString = String(cString: errorChars) + throw KafkaError.topicCreation(reason: errorString) + } + defer { rd_kafka_NewTopic_destroy(newTopic) } + + try self.withKafkaHandlePointer { kafkaHandle in + let resultQueue = rd_kafka_queue_new(kafkaHandle) + defer { rd_kafka_queue_destroy(resultQueue) } + + var newTopicsArray: [OpaquePointer?] = [newTopic] + rd_kafka_CreateTopics( + kafkaHandle, + &newTopicsArray, + 1, + nil, + resultQueue + ) + + guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { + throw KafkaError.topicCreation(reason: "No CreateTopics result after 10s timeout") + } + defer { rd_kafka_event_destroy(resultEvent) } + + let resultCode = rd_kafka_event_error(resultEvent) + guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: resultCode) + } + + guard let topicsResultEvent = rd_kafka_event_CreateTopics_result(resultEvent) else { + throw KafkaError.topicCreation(reason: "Received event that is not of type rd_kafka_CreateTopics_result_t") + } + + var resultTopicCount = 0 + let topicResults = rd_kafka_CreateTopics_result_topics( + topicsResultEvent, + &resultTopicCount + ) + + guard resultTopicCount == 1, let topicResult = topicResults?[0] else { + throw KafkaError.topicCreation(reason: "Received less/more than one topic result") + } + + let topicResultError = rd_kafka_topic_result_error(topicResult) + guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: topicResultError) + } + + let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) + guard receivedTopicName == uniqueTopicName else { + throw KafkaError.topicCreation(reason: "Received topic result for topic with different name") + } + } + + return uniqueTopicName + } + + /// Delete a topic. + /// Blocks for a maximum of `timeout` milliseconds. + /// - Parameter topic: Topic to delete. + /// - Parameter timeout: Timeout in milliseconds. + /// - Throws: A ``KafkaError`` if the topic deletion failed. + public func _deleteTopic(_ topic: String, timeout: Int32) throws { + let deleteTopic = rd_kafka_DeleteTopic_new(topic) + defer { rd_kafka_DeleteTopic_destroy(deleteTopic) } + + try self.withKafkaHandlePointer { kafkaHandle in + let resultQueue = rd_kafka_queue_new(kafkaHandle) + defer { rd_kafka_queue_destroy(resultQueue) } + + var deleteTopicsArray: [OpaquePointer?] = [deleteTopic] + rd_kafka_DeleteTopics( + kafkaHandle, + &deleteTopicsArray, + 1, + nil, + resultQueue + ) + + guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { + throw KafkaError.topicDeletion(reason: "No DeleteTopics result after 10s timeout") + } + defer { rd_kafka_event_destroy(resultEvent) } + + let resultCode = rd_kafka_event_error(resultEvent) + guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: resultCode) + } + + guard let topicsResultEvent = rd_kafka_event_DeleteTopics_result(resultEvent) else { + throw KafkaError.topicDeletion(reason: "Received event that is not of type rd_kafka_DeleteTopics_result_t") + } + + var resultTopicCount = 0 + let topicResults = rd_kafka_DeleteTopics_result_topics( + topicsResultEvent, + &resultTopicCount + ) + + guard resultTopicCount == 1, let topicResult = topicResults?[0] else { + throw KafkaError.topicDeletion(reason: "Received less/more than one topic result") + } + + let topicResultError = rd_kafka_topic_result_error(topicResult) + guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { + throw KafkaError.rdKafkaError(wrapping: topicResultError) + } + + let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) + guard receivedTopicName == topic else { + throw KafkaError.topicDeletion(reason: "Received topic result for topic with different name") + } + } + } + + public static func makeClientForTopics(config: KafkaConsumerConfiguration, logger: Logger) throws -> RDKafkaClient { + return try Self.makeClient(type: .consumer, configDictionary: config.dictionary, events: [], logger: logger) + } +} diff --git a/Sources/Kafka/ForTesting/TestMessages.swift b/Sources/Kafka/ForTesting/TestMessages.swift new file mode 100644 index 00000000..f9df6224 --- /dev/null +++ b/Sources/Kafka/ForTesting/TestMessages.swift @@ -0,0 +1,104 @@ +//===----------------------------------------------------------------------===// +// +// This source file is part of the swift-kafka-client open source project +// +// Copyright (c) 2022 Apple Inc. and the swift-kafka-client project authors +// Licensed under Apache License v2.0 +// +// See LICENSE.txt for license information +// See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +// +// SPDX-License-Identifier: Apache-2.0 +// +//===----------------------------------------------------------------------===// +import struct Foundation.Date +import NIOCore + +@_spi(Internal) +public enum _TestMessagesError: Error { + case deliveryReportsIdsIncorrect + case deliveryReportsNotAllMessagesAcknoledged + case deliveryReportsIncorrect +} + +@_spi(Internal) +public func _createTestMessages( + topic: String, + headers: [KafkaHeader] = [], + count: UInt +) -> [KafkaProducerMessage] { + return Array(0..], + skipConsistencyCheck: Bool = false +) async throws { + var messageIDs = Set() + messageIDs.reserveCapacity(messages.count) + + for message in messages { + while true { + do { + messageIDs.insert(try producer.send(message)) + break + } catch let error as KafkaError where error.description.contains("Queue full") { + // That means we have to flush queue immediately but there is no interface for that + // producer.flush() + } + } + } + + var receivedDeliveryReports = Set() + receivedDeliveryReports.reserveCapacity(messages.count) + + for await event in events { + switch event { + case .deliveryReports(let deliveryReports): + for deliveryReport in deliveryReports { + receivedDeliveryReports.insert(deliveryReport) + } + default: + break // Ignore any other events + } + + if receivedDeliveryReports.count >= messages.count { + break + } + } + + guard Set(receivedDeliveryReports.map(\.id)) == messageIDs else { + throw _TestMessagesError.deliveryReportsIdsIncorrect + } + + let acknowledgedMessages: [KafkaAcknowledgedMessage] = receivedDeliveryReports.compactMap { + guard case .acknowledged(let receivedMessage) = $0.status else { + return nil + } + return receivedMessage + } + + guard messages.count == acknowledgedMessages.count else { + throw _TestMessagesError.deliveryReportsNotAllMessagesAcknoledged + } + if skipConsistencyCheck { + return + } + for message in messages { + guard acknowledgedMessages.contains(where: { $0.topic == message.topic }), + acknowledgedMessages.contains(where: { $0.key == ByteBuffer(string: message.key!) }), + acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message.value) }) else { + throw _TestMessagesError.deliveryReportsIncorrect + } + } +} diff --git a/Sources/Kafka/KafkaConsumer.swift b/Sources/Kafka/KafkaConsumer.swift index c5ec5226..87384220 100644 --- a/Sources/Kafka/KafkaConsumer.swift +++ b/Sources/Kafka/KafkaConsumer.swift @@ -852,8 +852,10 @@ extension KafkaConsumer { switch consumerState { case .running(let source): self.state = .running(client: client, messagePollLoopState: .waitingForMessages(source: source)) - case .suspended, .waitingForMessages, .finished: + case .suspended, .waitingForMessages: fatalError("\(#function) should not be invoked in state \(self.state)") + case .finished: + break // ok, skip action } } diff --git a/Sources/Kafka/RDKafka/RDKafkaClient.swift b/Sources/Kafka/RDKafka/RDKafkaClient.swift index 014a3e8c..512bf56a 100644 --- a/Sources/Kafka/RDKafka/RDKafkaClient.swift +++ b/Sources/Kafka/RDKafka/RDKafkaClient.swift @@ -19,7 +19,8 @@ import Logging /// Base class for ``KafkaProducer`` and ``KafkaConsumer``, /// which is used to handle the connection to the Kafka ecosystem. -final class RDKafkaClient: Sendable { +@_spi(Internal) +public final class RDKafkaClient: Sendable { // Default size for Strings returned from C API static let stringSize = 1024 diff --git a/Tests/IntegrationTests/KafkaTests.swift b/Tests/IntegrationTests/KafkaTests.swift index e6cf82e5..525b9461 100644 --- a/Tests/IntegrationTests/KafkaTests.swift +++ b/Tests/IntegrationTests/KafkaTests.swift @@ -14,6 +14,7 @@ import struct Foundation.UUID @testable import Kafka +@_spi(Internal) import Kafka import NIOCore import ServiceLifecycle import XCTest @@ -79,7 +80,9 @@ final class KafkaTests: XCTestCase { events: [], logger: .kafkaTest ) - try client._deleteTopic(self.uniqueTestTopic, timeout: 10 * 1000) + if let uniqueTestTopic = self.uniqueTestTopic { + try client._deleteTopic(uniqueTestTopic, timeout: 10 * 1000) + } self.bootstrapBrokerAddress = nil self.producerConfig = nil @@ -606,14 +609,7 @@ final class KafkaTests: XCTestCase { headers: [KafkaHeader] = [], count: UInt ) -> [KafkaProducerMessage] { - return Array(0..] ) async throws { - var messageIDs = Set() - - for message in messages { - messageIDs.insert(try producer.send(message)) - } - - var receivedDeliveryReports = Set() - - for await event in events { - switch event { - case .deliveryReports(let deliveryReports): - for deliveryReport in deliveryReports { - receivedDeliveryReports.insert(deliveryReport) - } - default: - break // Ignore any other events - } - - if receivedDeliveryReports.count >= messages.count { - break - } - } - - XCTAssertEqual(Set(receivedDeliveryReports.map(\.id)), messageIDs) - - let acknowledgedMessages: [KafkaAcknowledgedMessage] = receivedDeliveryReports.compactMap { - guard case .acknowledged(let receivedMessage) = $0.status else { - return nil - } - return receivedMessage - } - - XCTAssertEqual(messages.count, acknowledgedMessages.count) - for message in messages { - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.topic == message.topic })) - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.key == ByteBuffer(string: message.key!) })) - XCTAssertTrue(acknowledgedMessages.contains(where: { $0.value == ByteBuffer(string: message.value) })) - } + return try await _sendAndAcknowledgeMessages(producer: producer, events: events, messages: messages) } } diff --git a/Tests/IntegrationTests/Utilities.swift b/Tests/IntegrationTests/Utilities.swift index db86c0a0..94dbf374 100644 --- a/Tests/IntegrationTests/Utilities.swift +++ b/Tests/IntegrationTests/Utilities.swift @@ -12,9 +12,7 @@ // //===----------------------------------------------------------------------===// -import Crdkafka import struct Foundation.UUID -@testable import Kafka import Logging extension Logger { @@ -24,159 +22,3 @@ extension Logger { return logger } } - -extension RDKafkaClient { -// func createUniqueTopic(timeout: Int32 = 10000) async throws -> String { -// try await withCheckedThrowingContinuation { continuation in -// do { -// let uniqueTopic = try self._createUniqueTopic(timeout: timeout) -// continuation.resume(returning: uniqueTopic) -// } catch { -// continuation.resume(throwing: error) -// } -// } -// } - - /// Create a topic with a unique name (`UUID`). - /// Blocks for a maximum of `timeout` milliseconds. - /// - Parameter timeout: Timeout in milliseconds. - /// - Returns: Name of newly created topic. - /// - Throws: A ``KafkaError`` if the topic creation failed. - func _createUniqueTopic(timeout: Int32) throws -> String { - let uniqueTopicName = UUID().uuidString - - let errorChars = UnsafeMutablePointer.allocate(capacity: RDKafkaClient.stringSize) - defer { errorChars.deallocate() } - - guard let newTopic = rd_kafka_NewTopic_new( - uniqueTopicName, - -1, // use default num_partitions - -1, // use default replication_factor - errorChars, - RDKafkaClient.stringSize - ) else { - let errorString = String(cString: errorChars) - throw KafkaError.topicCreation(reason: errorString) - } - defer { rd_kafka_NewTopic_destroy(newTopic) } - - try self.withKafkaHandlePointer { kafkaHandle in - let resultQueue = rd_kafka_queue_new(kafkaHandle) - defer { rd_kafka_queue_destroy(resultQueue) } - - var newTopicsArray: [OpaquePointer?] = [newTopic] - rd_kafka_CreateTopics( - kafkaHandle, - &newTopicsArray, - 1, - nil, - resultQueue - ) - - guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { - throw KafkaError.topicCreation(reason: "No CreateTopics result after 10s timeout") - } - defer { rd_kafka_event_destroy(resultEvent) } - - let resultCode = rd_kafka_event_error(resultEvent) - guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: resultCode) - } - - guard let topicsResultEvent = rd_kafka_event_CreateTopics_result(resultEvent) else { - throw KafkaError.topicCreation(reason: "Received event that is not of type rd_kafka_CreateTopics_result_t") - } - - var resultTopicCount = 0 - let topicResults = rd_kafka_CreateTopics_result_topics( - topicsResultEvent, - &resultTopicCount - ) - - guard resultTopicCount == 1, let topicResult = topicResults?[0] else { - throw KafkaError.topicCreation(reason: "Received less/more than one topic result") - } - - let topicResultError = rd_kafka_topic_result_error(topicResult) - guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: topicResultError) - } - - let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) - guard receivedTopicName == uniqueTopicName else { - throw KafkaError.topicCreation(reason: "Received topic result for topic with different name") - } - } - - return uniqueTopicName - } - -// func deleteTopic(_ topic: String, timeout: Int32 = 10000) async throws { -// try await withCheckedThrowingContinuation { continuation in -// do { -// try self._deleteTopic(topic, timeout: timeout) -// continuation.resume() -// } catch { -// continuation.resume(throwing: error) -// } -// } -// } - - /// Delete a topic. - /// Blocks for a maximum of `timeout` milliseconds. - /// - Parameter topic: Topic to delete. - /// - Parameter timeout: Timeout in milliseconds. - /// - Throws: A ``KafkaError`` if the topic deletion failed. - func _deleteTopic(_ topic: String, timeout: Int32) throws { - let deleteTopic = rd_kafka_DeleteTopic_new(topic) - defer { rd_kafka_DeleteTopic_destroy(deleteTopic) } - - try self.withKafkaHandlePointer { kafkaHandle in - let resultQueue = rd_kafka_queue_new(kafkaHandle) - defer { rd_kafka_queue_destroy(resultQueue) } - - var deleteTopicsArray: [OpaquePointer?] = [deleteTopic] - rd_kafka_DeleteTopics( - kafkaHandle, - &deleteTopicsArray, - 1, - nil, - resultQueue - ) - - guard let resultEvent = rd_kafka_queue_poll(resultQueue, timeout) else { - throw KafkaError.topicDeletion(reason: "No DeleteTopics result after 10s timeout") - } - defer { rd_kafka_event_destroy(resultEvent) } - - let resultCode = rd_kafka_event_error(resultEvent) - guard resultCode == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: resultCode) - } - - guard let topicsResultEvent = rd_kafka_event_DeleteTopics_result(resultEvent) else { - throw KafkaError.topicDeletion(reason: "Received event that is not of type rd_kafka_DeleteTopics_result_t") - } - - var resultTopicCount = 0 - let topicResults = rd_kafka_DeleteTopics_result_topics( - topicsResultEvent, - &resultTopicCount - ) - - guard resultTopicCount == 1, let topicResult = topicResults?[0] else { - throw KafkaError.topicDeletion(reason: "Received less/more than one topic result") - } - - let topicResultError = rd_kafka_topic_result_error(topicResult) - guard topicResultError == RD_KAFKA_RESP_ERR_NO_ERROR else { - throw KafkaError.rdKafkaError(wrapping: topicResultError) - } - - let receivedTopicName = String(cString: rd_kafka_topic_result_name(topicResult)) - guard receivedTopicName == topic else { - throw KafkaError.topicDeletion(reason: "Received topic result for topic with different name") - } - } - } -} diff --git a/dev/test-benchmark-thresholds.sh b/dev/test-benchmark-thresholds.sh new file mode 100644 index 00000000..731c3e97 --- /dev/null +++ b/dev/test-benchmark-thresholds.sh @@ -0,0 +1,42 @@ +#!/bin/bash +##===----------------------------------------------------------------------===## +## +## This source file is part of the swift-kafka-client open source project +## +## Copyright (c) YEARS Apple Inc. and the swift-kafka-client project authors +## Licensed under Apache License v2.0 +## +## See LICENSE.txt for license information +## See CONTRIBUTORS.txt for the list of swift-kafka-client project authors +## +## SPDX-License-Identifier: Apache-2.0 +## +##===----------------------------------------------------------------------===## + +cd Benchmarks +swift package --disable-sandbox benchmark baseline update PR --no-progress +git checkout main +swift package --disable-sandbox benchmark baseline update main --no-progress + +swift package benchmark baseline check main PR +BENCHMARK_RESULT=$? + +echo "Retcode is $BENCHMARK_RESULT" + +if [ $BENCHMARK_RESULT -eq 0 ]; then + echo "Benchmark results are the same as for main" +fi + +if [ $BENCHMARK_RESULT -eq 4 ]; then + echo "Benchmark results are better as for main" +fi + +if [ $BENCHMARK_RESULT -eq 1 ]; then + echo "Benchmark failed" + exit 1 +fi + +if [ $BENCHMARK_RESULT -eq 2 ]; then + echo "Benchmark results are worse than main" + exit 1 +fi diff --git a/docker/docker-compose.2204.57.yaml b/docker/docker-compose.2204.57.yaml index af7cda0c..a465a610 100644 --- a/docker/docker-compose.2204.57.yaml +++ b/docker/docker-compose.2204.57.yaml @@ -15,6 +15,7 @@ services: test: image: swift-kafka-client:22.04-5.7 environment: + - SWIFT_VERSION=5.7 - WARN_AS_ERROR_ARG=-Xswiftc -warnings-as-errors - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete # - SANITIZER_ARG=--sanitize=thread # TSan broken still diff --git a/docker/docker-compose.2204.58.yaml b/docker/docker-compose.2204.58.yaml index 521c6ac9..47b02679 100644 --- a/docker/docker-compose.2204.58.yaml +++ b/docker/docker-compose.2204.58.yaml @@ -15,6 +15,7 @@ services: test: image: swift-kafka-client:22.04-5.8 environment: + - SWIFT_VERSION=5.8 - WARN_AS_ERROR_ARG=-Xswiftc -warnings-as-errors - IMPORT_CHECK_ARG=--explicit-target-dependency-import-check error - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete diff --git a/docker/docker-compose.2204.59.yaml b/docker/docker-compose.2204.59.yaml index e0a562d7..8d9cf29d 100644 --- a/docker/docker-compose.2204.59.yaml +++ b/docker/docker-compose.2204.59.yaml @@ -15,6 +15,7 @@ services: test: image: swift-kafka-client:22.04-5.9 environment: + - SWIFT_VERSION=5.9 - WARN_AS_ERROR_ARG=-Xswiftc -warnings-as-errors - IMPORT_CHECK_ARG=--explicit-target-dependency-import-check error - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete diff --git a/docker/docker-compose.2204.main.yaml b/docker/docker-compose.2204.main.yaml index b4e098cf..acac1a54 100644 --- a/docker/docker-compose.2204.main.yaml +++ b/docker/docker-compose.2204.main.yaml @@ -11,6 +11,7 @@ services: test: image: swift-kafka-client:22.04-main environment: + - SWIFT_VERSION=main - WARN_AS_ERROR_ARG=-Xswiftc -warnings-as-errors - IMPORT_CHECK_ARG=--explicit-target-dependency-import-check error - STRICT_CONCURRENCY_ARG=-Xswiftc -strict-concurrency=complete diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index d8789f7c..10f1665c 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -50,12 +50,12 @@ services: <<: *common depends_on: [kafka, runtime-setup] environment: + SWIFT_VERSION: 5.7 KAFKA_HOST: kafka command: > /bin/bash -xcl " swift build --build-tests $${SANITIZER_ARG-} && \ - swift $${SWIFT_TEST_VERB-test} $${WARN_AS_ERROR_ARG-} $${SANITIZER_ARG-} $${IMPORT_CHECK_ARG-} $${STRICT_CONCURRENCY_ARG-} && \ - cd Benchmarks && swift package --disable-sandbox benchmark baseline check --check-absolute-path Thresholds/$${SWIFT_VERSION-}/ + swift $${SWIFT_TEST_VERB-test} $${WARN_AS_ERROR_ARG-} $${SANITIZER_ARG-} $${IMPORT_CHECK_ARG-} $${STRICT_CONCURRENCY_ARG-} " benchmark: @@ -73,7 +73,7 @@ services: depends_on: [kafka, runtime-setup] environment: KAFKA_HOST: kafka - command: /bin/bash -xcl "cd Benchmarks && swift package --disable-sandbox --scratch-path .build/$${SWIFT_VERSION-}/ --allow-writing-to-package-directory benchmark --format metricP90AbsoluteThresholds --path Thresholds/$${SWIFT_VERSION-}/" + command: /bin/bash -xcl "cd Benchmarks && swift package --disable-sandbox --scratch-path .build/$${SWIFT_VERSION-}/ --allow-writing-to-package-directory benchmark --format metricP90AbsoluteThresholds --path Thresholds/$${SWIFT_VERSION-}/ --no-progress" # util