diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index ee3f95be7..58284a7a9 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -470,19 +470,21 @@ def assignment(self): """ return self._subscription.assigned_partitions() - def close(self, autocommit=True): + def close(self, autocommit=True, timeout_ms=None): """Close the consumer, waiting indefinitely for any needed cleanup. Keyword Arguments: autocommit (bool): If auto-commit is configured for this consumer, this optional flag causes the consumer to attempt to commit any pending consumed offsets prior to close. Default: True + timeout_ms (num, optional): Milliseconds to wait for auto-commit. + Default: None """ if self._closed: return log.debug("Closing the KafkaConsumer.") self._closed = True - self._coordinator.close(autocommit=autocommit) + self._coordinator.close(autocommit=autocommit, timeout_ms=timeout_ms) self._metrics.close() self._client.close() try: diff --git a/test/test_consumer_group.py b/test/test_consumer_group.py index c175e142c..9334a4fd1 100644 --- a/test/test_consumer_group.py +++ b/test/test_consumer_group.py @@ -47,7 +47,7 @@ def test_group(kafka_broker, topic): consumers = {} stop = {} threads = {} - messages = collections.defaultdict(list) + messages = collections.defaultdict(lambda: collections.defaultdict(list)) group_id = 'test-group-' + random_string(6) def consumer_thread(i): assert i not in consumers @@ -60,15 +60,15 @@ def consumer_thread(i): api_version_auto_timeout_ms=5000, heartbeat_interval_ms=500) while not stop[i].is_set(): - for tp, records in six.itervalues(consumers[i].poll(timeout_ms=200)): + for tp, records in six.iteritems(consumers[i].poll(timeout_ms=200)): messages[i][tp].extend(records) - consumers[i].close() + consumers[i].close(timeout_ms=500) consumers[i] = None stop[i] = None num_consumers = 4 for i in range(num_consumers): - t = threading.Thread(target=consumer_thread, args=(i,)) + t = threading.Thread(target=consumer_thread, args=(i,), daemon=True) t.start() threads[i] = t @@ -129,7 +129,8 @@ def consumer_thread(i): for c in range(num_consumers): logging.info('Stopping consumer %s', c) stop[c].set() - threads[c].join() + threads[c].join(timeout=5) + assert not threads[c].is_alive() threads[c] = None @@ -179,4 +180,4 @@ def test_heartbeat_thread(kafka_broker, topic): assert consumer._coordinator.heartbeat.last_poll == last_poll consumer.poll(timeout_ms=100) assert consumer._coordinator.heartbeat.last_poll > last_poll - consumer.close() + consumer.close(timeout_ms=100)