From 8df8a72442fe79394a270be7626d71fc68fe405c Mon Sep 17 00:00:00 2001 From: llk89 Date: Tue, 1 Jul 2025 15:48:03 +0800 Subject: [PATCH 1/2] fix flush_in_progress not visible to sender --- kafka/producer/record_accumulator.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 77d48d84f..3a4e60146 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -430,7 +430,7 @@ def ready(self, cluster, now=None): expired = bool(waited_time >= time_to_wait) sendable = (full or expired or self._closed or - self._flush_in_progress()) + self.flush_in_progress()) if sendable and not backing_off: ready_nodes.add(leader) @@ -563,7 +563,7 @@ def deallocate(self, batch): """Deallocate the record batch.""" self._incomplete.remove(batch) - def _flush_in_progress(self): + def flush_in_progress(self): """Are there any threads currently waiting on a flush?""" return self._flushes_in_progress.get() > 0 From 75965027c3cb3b6a83911d3b995184a60020162a Mon Sep 17 00:00:00 2001 From: llk89 Date: Tue, 1 Jul 2025 15:59:25 +0800 Subject: [PATCH 2/2] fix more member visibility issue in transaction management --- kafka/producer/transaction_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 7302eb00e..5d69ddc97 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -553,11 +553,11 @@ def producer_epoch(self): return self.transaction_manager.producer_id_and_epoch.epoch def fatal_error(self, exc): - self.transaction_manager._transition_to_fatal_error(exc) + self.transaction_manager.transition_to_fatal_error(exc) self._result.done(error=exc) def abortable_error(self, exc): - self.transaction_manager._transition_to_abortable_error(exc) + self.transaction_manager.transition_to_abortable_error(exc) self._result.done(error=exc) def fail(self, exc):