From d9464d983e3f3e99ed47ab75ad85d57a992fbe4a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=98=D1=81=D0=BA=D0=B0=D0=BD=D0=B4=D0=B0=D1=80=D0=BE?= =?UTF-8?q?=D0=B2=20=D0=AD=D0=B4=D1=83=D0=B0=D1=80=D0=B4?= Date: Fri, 23 Jan 2015 12:38:38 +0300 Subject: [PATCH 1/3] add producer batch send queue size limit --- kafka/common.py | 4 ++++ kafka/producer/base.py | 21 ++++++++++++++++----- 2 files changed, 20 insertions(+), 5 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index b7bb06c92..b53692447 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -200,6 +200,10 @@ class KafkaConfigurationError(KafkaError): pass +class BatchQueueOverfilledError(KafkaError): + pass + + def _iter_broker_errors(): for name, obj in inspect.getmembers(sys.modules[__name__]): if inspect.isclass(obj) and issubclass(obj, BrokerResponseError) and obj != BrokerResponseError: diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 6e19b92c9..2f759f997 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -4,15 +4,16 @@ import time try: - from queue import Empty + from queue import Empty, Full except ImportError: - from Queue import Empty + from Queue import Empty, Full from collections import defaultdict from multiprocessing import Queue, Process import six from kafka.common import ( + BatchQueueOverfilledError, ProduceRequest, TopicAndPartition, UnsupportedCodecError ) from kafka.protocol import CODEC_NONE, ALL_CODECS, create_message_set @@ -21,6 +22,7 @@ BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 +BATCH_SEND_QUEUE_MAXSIZE = 0 STOP_ASYNC_PRODUCER = -1 @@ -113,12 +115,14 @@ def __init__(self, client, async=False, codec=None, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, - batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL): + batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, + batch_send_queue_maxsize=BATCH_SEND_QUEUE_MAXSIZE): if batch_send: async = True assert batch_send_every_n > 0 assert batch_send_every_t > 0 + assert batch_send_queue_maxsize >= 0 else: batch_send_every_n = 1 batch_send_every_t = 3600 @@ -139,7 +143,8 @@ def __init__(self, client, async=False, log.warning("async producer does not guarantee message delivery!") log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") - self.queue = Queue() # Messages are sent through this queue + # Messages are sent through this queue + self.queue = Queue(maxsize=batch_send_queue_maxsize) self.proc = Process(target=_send_upstream, args=(self.queue, self.client.copy(), @@ -188,7 +193,13 @@ def _send_messages(self, topic, partition, *msg, **kwargs): if self.async: for m in msg: - self.queue.put((TopicAndPartition(topic, partition), m, key)) + try: + item = (TopicAndPartition(topic, partition), m, key) + self.queue.put(item) + except Full: + raise BatchQueueOverfilledError( + 'Producer batch send queue overfilled. ' + 'Current queue size %d.' % self.queue.qsize()) resp = [] else: messages = create_message_set(msg, self.codec, key) From b0f5ef7c2ec5a0ad9c59be4679442ff16b0163ec Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D0=98=D1=81=D0=BA=D0=B0=D0=BD=D0=B4=D0=B0=D1=80=D0=BE?= =?UTF-8?q?=D0=B2=20=D0=AD=D0=B4=D1=83=D0=B0=D1=80=D0=B4?= Date: Fri, 23 Jan 2015 12:56:42 +0300 Subject: [PATCH 2/3] add producer send batch queue overfilled test --- kafka/producer/base.py | 2 +- test/test_producer.py | 16 +++++++++++++++- 2 files changed, 16 insertions(+), 2 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index 2f759f997..b44d3d350 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -195,7 +195,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs): for m in msg: try: item = (TopicAndPartition(topic, partition), m, key) - self.queue.put(item) + self.queue.put_nowait(item) except Full: raise BatchQueueOverfilledError( 'Producer batch send queue overfilled. ' diff --git a/test/test_producer.py b/test/test_producer.py index caf8fe35c..a4e6cf751 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -2,9 +2,10 @@ import logging -from mock import MagicMock +from mock import MagicMock, patch from . import unittest +from kafka.common import BatchQueueOverfilledError from kafka.producer.base import Producer class TestKafkaProducer(unittest.TestCase): @@ -25,3 +26,16 @@ def test_producer_message_types(self): # This should not raise an exception producer.send_messages(topic, partition, m) + @patch('kafka.producer.base.Process') + def test_producer_batch_send_queue_overfilled(self, process_mock): + queue_size = 2 + producer = Producer(MagicMock(), batch_send=True, + batch_send_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 + + message = b'test-message' + with self.assertRaises(BatchQueueOverfilledError): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) From b660057235f829fb98574db0fcdfd2cacf56019e Mon Sep 17 00:00:00 2001 From: Eduard Iskandarov Date: Sat, 24 Jan 2015 00:30:50 +0300 Subject: [PATCH 3/3] async queue: refactored code; add one more test --- kafka/producer/base.py | 9 +++++---- test/test_producer.py | 18 ++++++++++++++++-- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/kafka/producer/base.py b/kafka/producer/base.py index b44d3d350..dc817fb86 100644 --- a/kafka/producer/base.py +++ b/kafka/producer/base.py @@ -22,7 +22,8 @@ BATCH_SEND_DEFAULT_INTERVAL = 20 BATCH_SEND_MSG_COUNT = 20 -BATCH_SEND_QUEUE_MAXSIZE = 0 +# unlimited +ASYNC_QUEUE_MAXSIZE = 0 STOP_ASYNC_PRODUCER = -1 @@ -116,13 +117,13 @@ def __init__(self, client, async=False, batch_send=False, batch_send_every_n=BATCH_SEND_MSG_COUNT, batch_send_every_t=BATCH_SEND_DEFAULT_INTERVAL, - batch_send_queue_maxsize=BATCH_SEND_QUEUE_MAXSIZE): + async_queue_maxsize=ASYNC_QUEUE_MAXSIZE): if batch_send: async = True assert batch_send_every_n > 0 assert batch_send_every_t > 0 - assert batch_send_queue_maxsize >= 0 + assert async_queue_maxsize >= 0 else: batch_send_every_n = 1 batch_send_every_t = 3600 @@ -144,7 +145,7 @@ def __init__(self, client, async=False, log.warning("Current implementation does not retry Failed messages") log.warning("Use at your own risk! (or help improve with a PR!)") # Messages are sent through this queue - self.queue = Queue(maxsize=batch_send_queue_maxsize) + self.queue = Queue(maxsize=async_queue_maxsize) self.proc = Process(target=_send_upstream, args=(self.queue, self.client.copy(), diff --git a/test/test_producer.py b/test/test_producer.py index a4e6cf751..8f8d922c8 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -27,15 +27,29 @@ def test_producer_message_types(self): producer.send_messages(topic, partition, m) @patch('kafka.producer.base.Process') - def test_producer_batch_send_queue_overfilled(self, process_mock): + def test_producer_async_queue_overfilled_batch_send(self, process_mock): queue_size = 2 producer = Producer(MagicMock(), batch_send=True, - batch_send_queue_maxsize=queue_size) + async_queue_maxsize=queue_size) topic = b'test-topic' partition = 0 + message = b'test-message' + + with self.assertRaises(BatchQueueOverfilledError): + message_list = [message] * (queue_size + 1) + producer.send_messages(topic, partition, *message_list) + @patch('kafka.producer.base.Process') + def test_producer_async_queue_overfilled(self, process_mock): + queue_size = 2 + producer = Producer(MagicMock(), async=True, + async_queue_maxsize=queue_size) + + topic = b'test-topic' + partition = 0 message = b'test-message' + with self.assertRaises(BatchQueueOverfilledError): message_list = [message] * (queue_size + 1) producer.send_messages(topic, partition, *message_list)