From 6b4ff30689d1ed853e28577a7b7953c7b9a28f1b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 24 Mar 2025 10:46:35 -0700 Subject: [PATCH 1/3] fixup test_group iter bug --- test/test_consumer_group.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index c175e142c..1e62cd6f9 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -47,7 +47,7 @@ def test_group(kafka_broker, topic): consumers = {} stop = {} threads = {} - messages = collections.defaultdict(list) + messages = collections.defaultdict(lambda: collections.defaultdict(list)) group_id = 'test-group-' + random_string(6) def consumer_thread(i): assert i not in consumers @@ -60,7 +60,7 @@ def consumer_thread(i): api_version_auto_timeout_ms=5000, heartbeat_interval_ms=500) while not stop[i].is_set(): - for tp, records in six.itervalues(consumers[i].poll(timeout_ms=200)): + for tp, records in six.iteritems(consumers[i].poll(timeout_ms=200)): messages[i][tp].extend(records) consumers[i].close() consumers[i] = None From 843f84fac7b3abe86e9f9c0accfc945445e91c64 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 24 Mar 2025 12:41:36 -0700 Subject: [PATCH 2/3] Add optional timeout_ms kwarg to consumer.close() --- kafka/consumer/group.py | 6 ++++-- test/test_consumer_group.py | 4 ++-- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index ee3f95be7..58284a7a9 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -470,19 +470,21 @@ def assignment(self): """ return self._subscription.assigned_partitions() - def close(self, autocommit=True): + def close(self, autocommit=True, timeout_ms=None): """Close the consumer, waiting indefinitely for any needed cleanup. Keyword Arguments: autocommit (bool): If auto-commit is configured for this consumer, this optional flag causes the consumer to attempt to commit any pending consumed offsets prior to close. Default: True + timeout_ms (num, optional): Milliseconds to wait for auto-commit. + Default: None """ if self._closed: return log.debug("Closing the KafkaConsumer.") self._closed = True - self._coordinator.close(autocommit=autocommit) + self._coordinator.close(autocommit=autocommit, timeout_ms=timeout_ms) self._metrics.close() self._client.close() try: diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 1e62cd6f9..7a618dea6 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -62,7 +62,7 @@ def consumer_thread(i): while not stop[i].is_set(): for tp, records in six.iteritems(consumers[i].poll(timeout_ms=200)): messages[i][tp].extend(records) - consumers[i].close() + consumers[i].close(timeout_ms=500) consumers[i] = None stop[i] = None @@ -179,4 +179,4 @@ def test_heartbeat_thread(kafka_broker, topic): assert consumer._coordinator.heartbeat.last_poll == last_poll consumer.poll(timeout_ms=100) assert consumer._coordinator.heartbeat.last_poll > last_poll - consumer.close() + consumer.close(timeout_ms=100) From 1c3ad62e2ab831b37569579ff1b00f7fad64e1df Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 24 Mar 2025 12:48:23 -0700 Subject: [PATCH 3/3] Use daemon threads in test_group and timeout thread.join() --- test/test_consumer_group.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index 7a618dea6..9334a4fd1 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -68,7 +68,7 @@ def consumer_thread(i): num_consumers = 4 for i in range(num_consumers): - t = threading.Thread(target=consumer_thread, args=(i,)) + t = threading.Thread(target=consumer_thread, args=(i,), daemon=True) t.start() threads[i] = t @@ -129,7 +129,8 @@ def consumer_thread(i): for c in range(num_consumers): logging.info('Stopping consumer %s', c) stop[c].set() - threads[c].join() + threads[c].join(timeout=5) + assert not threads[c].is_alive() threads[c] = None