diff --git a/kafka/errors.py b/kafka/errors.py index 351e07375..dffa35f35 100644 --- a/kafka/errors.py +++ b/kafka/errors.py @@ -102,6 +102,10 @@ class UnsupportedCodecError(KafkaError): pass +class TransactionAbortedError(KafkaError): + pass + + class BrokerResponseError(KafkaError): errno = None message = None diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 09b9a0f10..d7855b03d 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -166,7 +166,11 @@ def run_once(self): self._client.poll(timeout_ms=self.config['retry_backoff_ms']) return elif self._transaction_manager.has_abortable_error(): - self._accumulator.abort_undrained_batches(self._transaction_manager.last_error) + # Attempt to get the last error that caused this abort. + # If there was no error, but we are still aborting, + # then this is most likely a case where there was no fatal error. + exception = self._transaction_manager.last_error or Errors.TransactionAbortedError() + self._accumulator.abort_undrained_batches(exception) except Errors.SaslAuthenticationFailedError as e: # This is already logged as error, but propagated here to perform any clean ups.