diff --git a/kafka/common.py b/kafka/common.py index b7bb06c92..b53692447 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -200,6 +200,10 @@ class KafkaConfigurationError(KafkaError): pass +class BatchQueueOverfilledError(KafkaError): + pass + + def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 6e19b92c9..dc817fb86 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -4,15 +4,16 @@ import time try: - from queue import Empty + from queue import Empty, Full except ImportError: - from Queue import Empty + from Queue import Empty, Full from collections import defaultdict from multiprocessing import Queue, Process import six from kafka.common import ( + BatchQueueOverfilledError, ProduceRequest, TopicAndPartition, UnsupportedCodecError ) from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set @@ -21,6 +22,8 @@ BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 +# unlimited +ASYNC_QUEUE_MAXSIZE = 0 STOP_ASYNC_PRODUCER = -1 @@ -113,12 +116,14 @@ def __init__(self, client, async=False, codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE): if batch_send: async = True assert batch_send_every_n > 0 assert batch_send_every_t > 0 + assert async_queue_maxsize >= 0 else: batch_send_every_n = 1 batch_send_every_t = 3600 @@ -139,7 +144,8 @@ def __init__(self, client, async=False, log.warning("async producer does not guarantee message delivery!") log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") - self.queue = Queue() # Messages are sent through this queue + # Messages are sent through this queue + self.queue = Queue(maxsize=async_queue_maxsize) self.proc = Process(target=_send_upstream, args=(self.queue, self.client.copy(), @@ -188,7 +194,13 @@ def _send_messages(self, topic, partition, *msg, **kwargs): if self.async: for m in msg: - self.queue.put((TopicAndPartition(topic, partition), m, key)) + try: + item = (TopicAndPartition(topic, partition), m, key) + self.queue.put_nowait(item) + except Full: + raise BatchQueueOverfilledError( + 'Producer batch send queue overfilled. ' + 'Current queue size %d.' % self.queue.qsize()) resp = [] else: messages = create_message_set(msg, self.codec, key) diff --git a/test/test_producer.py b/test/test_producer.py index caf8fe35c..8f8d922c8 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -2,9 +2,10 @@ import logging -from mock import MagicMock +from mock import MagicMock, patch from . import unittest +from kafka.common import BatchQueueOverfilledError from kafka.producer.base import Producer class TestKafkaProducer(unittest.TestCase): @@ -25,3 +26,30 @@ def test_producer_message_types(self): # This should not raise an exception producer.send_messages(topic, partition, m) + @patch('kafka.producer.base.Process') + def test_producer_async_queue_overfilled_batch_send(self, process_mock): + queue_size = 2 + producer = Producer(MagicMock(), batch_send=True, + async_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + with self.assertRaises(BatchQueueOverfilledError): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + + @patch('kafka.producer.base.Process') + def test_producer_async_queue_overfilled(self, process_mock): + queue_size = 2 + producer = Producer(MagicMock(), async=True, + async_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + message = b'test-message' + + with self.assertRaises(BatchQueueOverfilledError): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list)