diff --git a/kafka/producer/record_accumulator.py b/kafka/producer/record_accumulator.py index 77d48d84f..3a4e60146 100644 --- a/kafka/producer/record_accumulator.py +++ b/kafka/producer/record_accumulator.py @@ -430,7 +430,7 @@ def ready(self, cluster, now=None): expired = bool(waited_time >= time_to_wait) sendable = (full or expired or self._closed or - self._flush_in_progress()) + self.flush_in_progress()) if sendable and not backing_off: ready_nodes.add(leader) @@ -563,7 +563,7 @@ def deallocate(self, batch): """Deallocate the record batch.""" self._incomplete.remove(batch) - def _flush_in_progress(self): + def flush_in_progress(self): """Are there any threads currently waiting on a flush?""" return self._flushes_in_progress.get() > 0 diff --git a/kafka/producer/transaction_manager.py b/kafka/producer/transaction_manager.py index 7302eb00e..5d69ddc97 100644 --- a/kafka/producer/transaction_manager.py +++ b/kafka/producer/transaction_manager.py @@ -553,11 +553,11 @@ def producer_epoch(self): return self.transaction_manager.producer_id_and_epoch.epoch def fatal_error(self, exc): - self.transaction_manager._transition_to_fatal_error(exc) + self.transaction_manager.transition_to_fatal_error(exc) self._result.done(error=exc) def abortable_error(self, exc): - self.transaction_manager._transition_to_abortable_error(exc) + self.transaction_manager.transition_to_abortable_error(exc) self._result.done(error=exc) def fail(self, exc):