From 3e766b6a8095dfbab27aa12ff0f68af1155c055f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 24 Feb 2025 20:36:19 -0800 Subject: [PATCH] default client.check_version timeout to api_version_auto_timeout_ms --- kafka/admin/client.py | 2 +- kafka/client_async.py | 17 ++++++++++------- 2 files changed, 11 insertions(+), 8 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 310227855..68b0af115 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -261,7 +261,7 @@ def _refresh_controller_id(self, timeout_ms=30000): time.sleep(1) continue # verify the controller is new enough to support our requests - controller_version = self._client.check_version(node_id=controller_id, timeout=(self.config['api_version_auto_timeout_ms'] / 1000)) + controller_version = self._client.check_version(node_id=controller_id) if controller_version < (0, 10, 0): raise IncompatibleBrokerVersion( "The controller appears to be running Kafka {}. KafkaAdminClient requires brokers >= 0.10.0.0." diff --git a/kafka/client_async.py b/kafka/client_async.py index 27f6ab830..301a5fd26 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -249,8 +249,7 @@ def __init__(self, **configs): # Check Broker Version if not set explicitly if self.config['api_version'] is None: - check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 - self.config['api_version'] = self.check_version(timeout=check_timeout) + self.config['api_version'] = self.check_version() elif self.config['api_version'] in BROKER_API_VERSIONS: self._api_versions = BROKER_API_VERSIONS[self.config['api_version']] elif (self.config['api_version'] + (0,)) in BROKER_API_VERSIONS: @@ -921,13 +920,16 @@ def get_api_versions(self): """ return self._api_versions - def check_version(self, node_id=None, timeout=2, strict=False): + def check_version(self, node_id=None, timeout=None, strict=False): """Attempt to guess the version of a Kafka broker. - Note: It is possible that this method blocks longer than the - specified timeout. This can happen if the entire cluster - is down and the client enters a bootstrap backoff sleep. - This is only possible if node_id is None. + Keyword Arguments: + node_id (str, optional): Broker node id from cluster metadata. If None, attempts + to connect to any available broker until version is identified. + Default: None + timeout (num, optional): Maximum time in seconds to try to check broker version. + If unable to identify version before timeout, raise error (see below). + Default: api_version_auto_timeout_ms / 1000 Returns: version tuple, i.e. (3, 9), (2, 0), (0, 10, 2) etc @@ -937,6 +939,7 @@ def check_version(self, node_id=None, timeout=2, strict=False): UnrecognizedBrokerVersion: please file bug if seen! AssertionError (if strict=True): please file bug if seen! """ + timeout = timeout or (self.config['api_version_auto_timeout_ms'] / 1000) self._lock.acquire() end = time.time() + timeout while time.time() < end: