diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 4bd3de49a..ded5708bf 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -10,7 +10,7 @@ from Queue import Empty, Queue from collections import defaultdict -from threading import Thread, Event +from threading import Thread, Event, Lock import six @@ -29,7 +29,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, - req_acks, ack_timeout, stop_event): + req_acks, ack_timeout, stop_event, connection_state_lock, connection_exc): """ Listen on the queue for a specified number of messages or till a specified timeout and send them upstream to the brokers in one @@ -71,12 +71,32 @@ def _send_upstream(queue, client, codec, batch_time, batch_size, messages) reqs.append(req) - try: - client.send_produce_request(reqs, - acks=req_acks, - timeout=ack_timeout) - except Exception: - log.exception("Unable to send message") + delivered = False + attempt = 0 + while not delivered: + try: + client.send_produce_request(reqs, + acks=req_acks, + timeout=ack_timeout) + delivered = True + + # Set connection state as available + connection_state_lock.acquire() + connection_exc[0] = None + connection_state_lock.release() + except Exception as e: + log.exception("Unable to send message") + + # Set connection state as unavailable + connection_state_lock.acquire() + connection_exc[0] = e + connection_state_lock.release() + + # Exponential back-off with min 0.1s, max 12.8s + attempt = min(attempt + 1, 7) + sleep_time = 0.1*(2**attempt) + log.warning("Sleeping for {0}s before retrying".format(sleep_time)) + time.sleep(sleep_time) class Producer(object): @@ -140,6 +160,8 @@ def __init__(self, client, async=False, log.warning("Use at your own risk! (or help improve with a PR!)") self.queue = Queue() # Messages are sent through this queue self.thread_stop_event = Event() + self.connection_state_lock = Lock() + self.connection_exc = [None] self.thread = Thread(target=_send_upstream, args=(self.queue, self.client.copy(), @@ -148,7 +170,9 @@ def __init__(self, client, async=False, batch_send_every_n, self.req_acks, self.ack_timeout, - self.thread_stop_event)) + self.thread_stop_event, + self.connection_state_lock, + self.connection_exc)) # Thread will die if main thread exits self.thread.daemon = True @@ -199,6 +223,13 @@ def _send_messages(self, topic, partition, *msg, **kwargs): raise TypeError("the key must be type bytes") if self.async: + # Check if connection is available, otherwise fail to add message + self.connection_state_lock.acquire() + exc = self.connection_exc[0] + self.connection_state_lock.release() + if exc != None: + raise exc + for m in msg: self.queue.put((TopicAndPartition(topic, partition), m, key)) resp = []