From 6dbc8f0c45088f1d77bc4e93c8c2c006ea431ddb Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Mar 2025 10:53:59 -0700 Subject: [PATCH] Only refresh metadata if connection fails all dns records --- kafka/client_async.py | 3 +-- kafka/conn.py | 4 ++++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 78ff1c118..69f91c0f0 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -236,7 +236,6 @@ def __init__(self, **configs): self._api_versions = None self._connecting = set() self._sending = set() - self._refresh_on_disconnects = True # Not currently used, but data is collected internally self._last_bootstrap = 0 @@ -382,7 +381,7 @@ def _conn_state_change(self, node_id, sock, conn): elif self.cluster.is_bootstrap(node_id): self._bootstrap_fails += 1 - elif self._refresh_on_disconnects and not self._closed and not idle_disconnect: + elif conn.connect_failed() and not self._closed and not idle_disconnect: log.warning("Node %s connection failed -- refreshing metadata", node_id) self.cluster.request_update() diff --git a/kafka/conn.py b/kafka/conn.py index 2f8c2491c..247e88fd7 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -850,6 +850,10 @@ def disconnected(self): """Return True iff socket is closed""" return self.state is ConnectionStates.DISCONNECTED + def connect_failed(self): + """Return True iff connection attempt failed after attempting all dns records""" + return self.disconnected() and self.last_attempt >= 0 and len(self._gai) == 0 + def _reset_reconnect_backoff(self): self._failures = 0 self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0