From f460b26e744b991ce308b9c5bcf6d039f4606451 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 15 Feb 2025 10:31:17 -0800 Subject: [PATCH 1/3] Check for wakeup socket errors on read and close and reinit to reset --- kafka/client_async.py | 47 ++++++++++++++++++++++++++++++++----------- 1 file changed, 35 insertions(+), 12 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index ea5e606cb..6bc644f62 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -204,8 +204,9 @@ def __init__(self, **configs): # these properties need to be set on top of the initialization pipeline # because they are used when __del__ method is called self._closed = False - self._wake_r, self._wake_w = socket.socketpair() self._selector = self.config['selector']() + self._init_wakeup_socketpair() + self._wake_lock = threading.Lock() self.cluster = ClusterMetadata(**self.config) self._topics = set() # empty set will fetch all topic metadata @@ -217,9 +218,6 @@ def __init__(self, **configs): self._refresh_on_disconnects = True self._last_bootstrap = 0 self._bootstrap_fails = 0 - self._wake_r.setblocking(False) - self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0) - self._wake_lock = threading.Lock() self._lock = threading.RLock() @@ -228,7 +226,6 @@ def __init__(self, **configs): # lock above. self._pending_completion = collections.deque() - self._selector.register(self._wake_r, selectors.EVENT_READ) self._idle_expiry_manager = IdleConnectionManager(self.config['connections_max_idle_ms']) self._sensors = None if self.config['metrics']: @@ -243,6 +240,25 @@ def __init__(self, **configs): check_timeout = self.config['api_version_auto_timeout_ms'] / 1000 self.config['api_version'] = self.check_version(timeout=check_timeout) + def _init_wakeup_socketpair(self): + self._wake_r, self._wake_w = socket.socketpair() + self._wake_r.setblocking(False) + self._wake_w.settimeout(self.config['wakeup_timeout_ms'] / 1000.0) + self._waking = False + self._selector.register(self._wake_r, selectors.EVENT_READ) + + def _close_wakeup_socketpair(self): + if self._wake_r is not None: + try: + self._selector.unregister(self._wake_r) + except KeyError: + pass + self._wake_r.close() + if self._wake_w is not None: + self._wake_w.close() + self._wake_r = None + self._wake_w = None + def _can_bootstrap(self): effective_failures = self._bootstrap_fails // self._num_bootstrap_hosts backoff_factor = 2 ** effective_failures @@ -416,9 +432,8 @@ def connected(self, node_id): def _close(self): if not self._closed: self._closed = True - self._wake_r.close() - self._wake_w.close() self._selector.close() + self._close_wakeup_socketpair() def close(self, node_id=None): """Close one or all broker connections. @@ -944,20 +959,28 @@ def check_version(self, node_id=None, timeout=2, strict=False): raise Errors.NoBrokersAvailable() def wakeup(self): + if self._waking or self._wake_w is None: + return with self._wake_lock: try: self._wake_w.sendall(b'x') - except socket.timeout: + self._waking = True + except socket.timeout as e: log.warning('Timeout to send to wakeup socket!') - raise Errors.KafkaTimeoutError() - except socket.error: - log.warning('Unable to send to wakeup socket!') + raise Errors.KafkaTimeoutError(e) + except socket.error as e: + log.warning('Unable to send to wakeup socket! %s', e) + raise e def _clear_wake_fd(self): # reading from wake socket should only happen in a single thread + self._waking = False while True: try: - self._wake_r.recv(1024) + if not self._wake_r.recv(1024): + self._close_wakeup_socketpair() + self._init_wakeup_socketpair() + break except socket.error: break From 4eb9481a5699dd268bf89dd81ca4cadcc1d029ac Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 15 Feb 2025 14:10:30 -0800 Subject: [PATCH 2/3] clear wake_r w/ lock --- kafka/client_async.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 6bc644f62..2ffba0306 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -974,15 +974,16 @@ def wakeup(self): def _clear_wake_fd(self): # reading from wake socket should only happen in a single thread - self._waking = False - while True: - try: - if not self._wake_r.recv(1024): - self._close_wakeup_socketpair() - self._init_wakeup_socketpair() + with self._wake_lock: + self._waking = False + while True: + try: + if not self._wake_r.recv(1024): + self._close_wakeup_socketpair() + self._init_wakeup_socketpair() + break + except socket.error: break - except socket.error: - break def _maybe_close_oldest_connection(self): expired_connection = self._idle_expiry_manager.poll_expired_connection() From fc72eb4af44d49bb78902aaadaee863338441151 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sat, 15 Feb 2025 15:02:28 -0800 Subject: [PATCH 3/3] Add warning log and comments --- kafka/client_async.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka/client_async.py b/kafka/client_async.py index 2ffba0306..f8919e028 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -979,10 +979,13 @@ def _clear_wake_fd(self): while True: try: if not self._wake_r.recv(1024): + # Non-blocking socket returns empty on error + log.warning("Error reading wakeup socket. Rebuilding socketpair.") self._close_wakeup_socketpair() self._init_wakeup_socketpair() break except socket.error: + # Non-blocking socket raises when socket is ok but no data available to read break def _maybe_close_oldest_connection(self):