From 6c38cf08e7c9542f2ef66f85782a39c9f68972c4 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 15 May 2025 13:16:59 -0700 Subject: [PATCH 1/4] Wait for next heartbeat + log in thread loop; check for connected coordinator before sending --- kafka/coordinator/base.py | 17 +++++++++++++---- 1 file changed, 13 insertions(+), 4 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 852157811..52974b6af 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -250,6 +250,12 @@ def coordinator(self): else: return self.coordinator_id + def connected(self): + """Return True iff the coordinator node is connected""" + with self._lock: + if self.coordinator_id is not None and self._client.connected(self.coordinator_id): + return True + def ensure_coordinator_ready(self, timeout_ms=None): """Block until the coordinator for this group is known. @@ -1088,6 +1094,10 @@ def _run_once(self): self.coordinator._client._lock.release() self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + elif not self.coordinator.connected(): + self.coordinator._client._lock.release() + self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + elif self.coordinator.heartbeat.session_timeout_expired(): # the session timeout has expired without seeing a # successful heartbeat, so we should probably make sure @@ -1103,11 +1113,10 @@ def _run_once(self): self.coordinator.maybe_leave_group() elif not self.coordinator.heartbeat.should_heartbeat(): - # poll again after waiting for the retry backoff in case - # the heartbeat failed or the coordinator disconnected - heartbeat_log.log(0, 'Not ready to heartbeat, waiting') + next_hb = self.coordinator.heartbeat.time_to_next_heartbeat() + heartbeat_log.debug('Waiting %0.1f secs to send next heartbeat', next_hb) self.coordinator._client._lock.release() - self.coordinator._lock.wait(self.coordinator.config['retry_backoff_ms'] / 1000) + self.coordinator._lock.wait(next_hb) else: self.coordinator.heartbeat.sent_heartbeat() From 777336475eada5fc7c56154f331844a211851004 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 15 May 2025 13:18:14 -0700 Subject: [PATCH 2/4] Slight simplification of heartbeat thread loop --- kafka/coordinator/base.py | 22 +++++++++++----------- 1 file changed, 11 insertions(+), 11 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index 52974b6af..f1b86c54f 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -1064,28 +1064,28 @@ def _run_once(self): self.coordinator._client._lock.acquire() self.coordinator._lock.acquire() try: - if self.enabled and self.coordinator.state is MemberState.STABLE: - # TODO: When consumer.wakeup() is implemented, we need to - # disable here to prevent propagating an exception to this - # heartbeat thread - # must get client._lock, or maybe deadlock at heartbeat - # failure callback in consumer poll - self.coordinator._client.poll(timeout_ms=0) - if not self.enabled: heartbeat_log.debug('Heartbeat disabled. Waiting') self.coordinator._client._lock.release() self.coordinator._lock.wait() - heartbeat_log.debug('Heartbeat re-enabled.') + if self.enabled: + heartbeat_log.debug('Heartbeat re-enabled.') + return - elif self.coordinator.state is not MemberState.STABLE: + if self.coordinator.state is not MemberState.STABLE: # the group is not stable (perhaps because we left the # group or because the coordinator kicked us out), so # disable heartbeats and wait for the main thread to rejoin. heartbeat_log.debug('Group state is not stable, disabling heartbeats') self.disable() + return + + # TODO: When consumer.wakeup() is implemented, we need to + # disable here to prevent propagating an exception to this + # heartbeat thread + self.coordinator._client.poll(timeout_ms=0) - elif self.coordinator.coordinator_unknown(): + if self.coordinator.coordinator_unknown(): future = self.coordinator.lookup_coordinator() if not future.is_done or future.failed(): # the immediate future check ensures that we backoff From 674618687b98b0b5452e6a5a4e4b6c01eacb8f2a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 15 May 2025 13:37:35 -0700 Subject: [PATCH 3/4] connected() also returns False --- kafka/coordinator/base.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/kafka/coordinator/base.py b/kafka/coordinator/base.py index f1b86c54f..b128e5548 100644 --- a/kafka/coordinator/base.py +++ b/kafka/coordinator/base.py @@ -253,8 +253,7 @@ def coordinator(self): def connected(self): """Return True iff the coordinator node is connected""" with self._lock: - if self.coordinator_id is not None and self._client.connected(self.coordinator_id): - return True + return self.coordinator_id is not None and self._client.connected(self.coordinator_id) def ensure_coordinator_ready(self, timeout_ms=None): """Block until the coordinator for this group is known. From f025e9e79ee902db475bb32b71cb20d9625e31cf Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Thu, 15 May 2025 13:50:28 -0700 Subject: [PATCH 4/4] patch coordinator.connected() for heartbeat test --- test/test_coordinator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/test/test_coordinator.py b/test/test_coordinator.py index 251de566a..4ffe1d28c 100644 --- a/test/test_coordinator.py +++ b/test/test_coordinator.py @@ -658,6 +658,7 @@ def test_heartbeat(mocker, patched_coord): heartbeat.enable() patched_coord.state = MemberState.STABLE mocker.spy(patched_coord, '_send_heartbeat_request') + mocker.patch.object(patched_coord, 'connected', return_value=True) mocker.patch.object(patched_coord.heartbeat, 'should_heartbeat', return_value=True) heartbeat._run_once() assert patched_coord._send_heartbeat_request.call_count == 1