From 080d460e4976995a84755b8d986f01448b53982f Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 2 Mar 2015 09:41:10 -0800 Subject: [PATCH 01/11] kafka.serializer.*: AbstractSerializer interface and several defaults: Noop, String, Json, Msgpack --- kafka/serializer/__init__.py | 6 ++++ kafka/serializer/abstract.py | 56 ++++++++++++++++++++++++++++++++++++ kafka/serializer/json.py | 19 ++++++++++++ kafka/serializer/msgpack.py | 33 +++++++++++++++++++++ kafka/serializer/noop.py | 40 ++++++++++++++++++++++++++ kafka/serializer/string.py | 47 ++++++++++++++++++++++++++++++ 6 files changed, 201 insertions(+) create mode 100644 kafka/serializer/__init__.py create mode 100644 kafka/serializer/abstract.py create mode 100644 kafka/serializer/json.py create mode 100644 kafka/serializer/msgpack.py create mode 100644 kafka/serializer/noop.py create mode 100644 kafka/serializer/string.py diff --git a/kafka/serializer/__init__.py b/kafka/serializer/__init__.py new file mode 100644 index 000000000..d202a485c --- /dev/null +++ b/kafka/serializer/__init__.py @@ -0,0 +1,6 @@ +from .noop import NoopSerializer +from .string import StringSerializer + +__all__ = [ + 'NoopSerializer', 'StringSerializer' +] diff --git a/kafka/serializer/abstract.py b/kafka/serializer/abstract.py new file mode 100644 index 000000000..f11585b60 --- /dev/null +++ b/kafka/serializer/abstract.py @@ -0,0 +1,56 @@ +import abc + +class AbstractSerializer(object): + """ + Abstract Serializer/Deserializer Interface, based on java client interface. + + Used to convert python data to/from raw kafka messages (bytes) + + Methods: + serialize(topic, data): convert data to bytes for topic + deserialize(topic, data): convert data from bytes for topic + + See Also: + http://kafka.apache.org/082/javadoc/org/apache/kafka/common/serialization/Serializer.html + http://kafka.apache.org/082/javadoc/org/apache/kafka/common/serialization/Deserializer.html + """ + __metaclass__ = abc.ABCMeta + + @abc.abstractmethod + def __init__(self, is_key=False, **configs): + """ + Parameters: + is_key (bool): True if this instance will be passed keys + instead of values (default: False, expects values) + + Other Parameters: + subclasses can optionally use keyword arguments to help configure + """ + pass + + @abc.abstractmethod + def serialize(self, topic, data): + """ + Parameters: + topic (str): topic associated with the data + data (abstract): the python key or value that should be serialized + + Returns: + serialized bytes that can be sent to the kafka cluster + """ + # example: encode unicode strings as utf-8 bytes + return data.encode('utf-8') + + @abc.abstractmethod + def deserialize(self, topic, data): + """ + Parameters: + topic (str): topic associated with the data + data (bytes): serialized bytes as received from kafka + + Returns: + a deserialized python data structure, as determined by + the subclass [e.g., str int float dict list ...] + """ + # example: decode utf-8 bytes to unicode python strings + return data.decode('utf-8') diff --git a/kafka/serializer/json.py b/kafka/serializer/json.py new file mode 100644 index 000000000..795926f57 --- /dev/null +++ b/kafka/serializer/json.py @@ -0,0 +1,19 @@ +from __future__ import absolute_import + +import json + +from kafka.serializer.abstract import AbstractSerializer + + +class JsonSerializer(AbstractSerializer): + """ + Serialize / Deserialize using json protocol + """ + def __init__(self, is_key=False, **configs): + self.is_key = is_key + + def serialize(self, topic, data): + return json.dumps(data) + + def deserialize(self, topic, data): + return json.loads(data) diff --git a/kafka/serializer/msgpack.py b/kafka/serializer/msgpack.py new file mode 100644 index 000000000..3de70133b --- /dev/null +++ b/kafka/serializer/msgpack.py @@ -0,0 +1,33 @@ +from __future__ import absolute_import + +import logging + +from kafka.serializer.abstract import AbstractSerializer + +logger = logging.getLogger(__name__) + +try: + import msgpack +except ImportError: + # Msgpack support not enabled + logger.warning('msgpack module not found -- MsgpackSerializer disabled') + pass + + +class MsgpackSerializer(AbstractSerializer): + """ + Serialize / Deserialize using msgpack protocol + """ + def __init__(self, is_key=False, **configs): + self.is_key = is_key + + # Instead of a local import, we check whether global import succeeded + # by checking the namespace + if globals().get('msgpack') is None: + raise ImportError('msgpack module not found') + + def serialize(self, topic, data): + return msgpack.dumps(data) + + def deserialize(self, topic, data): + return msgpack.loads(data) diff --git a/kafka/serializer/noop.py b/kafka/serializer/noop.py new file mode 100644 index 000000000..8eba516cc --- /dev/null +++ b/kafka/serializer/noop.py @@ -0,0 +1,40 @@ +from kafka.serializer.abstract import AbstractSerializer + + +class NoopSerializer(AbstractSerializer): + """ + A noop serializer class that just returns what it gets. + """ + def __init__(self, is_key=False, **configs): + """ + Arguments: + is_key (bool): whether this instance will serialize keys (True) instead + of values (False); default: False + encoding (str): a python str-to-bytes encoding codec; default: utf-8 + + See Also: + https://docs.python.org/2/library/codecs.html#standard-encodings + """ + pass + + def serialize(self, topic, data): + """ + Parameters: + topic (str): topic associated with the data + data (bytes): bytes -- no serialization required! + + Returns: + the data, unaltered + """ + return data + + def deserialize(self, topic, data): + """ + Parameters: + topic (str): topic associated with the data + data (bytes): serialized bytes as received from kafka + + Returns: + the data, unaltered + """ + return data diff --git a/kafka/serializer/string.py b/kafka/serializer/string.py new file mode 100644 index 000000000..6e7d7ea46 --- /dev/null +++ b/kafka/serializer/string.py @@ -0,0 +1,47 @@ +import codecs + +from kafka.serializer.abstract import AbstractSerializer + + +class StringSerializer(AbstractSerializer): + """ + A simple serializer class that encodes python strings + using the python codecs library + """ + def __init__(self, is_key=False, **configs): + """ + Arguments: + is_key (bool): whether this instance will serialize keys (True) instead + of values (False); default: False + encoding (str): a python str-to-bytes encoding codec; default: utf-8 + + See Also: + https://docs.python.org/2/library/codecs.html#standard-encodings + """ + self.is_key = is_key + self.encoding = configs.get('encoding', 'utf-8') + + # lookup the encoding to verify and cache + codecs.lookup(self.encoding) + + def serialize(self, topic, data): + """ + Parameters: + topic (str): topic associated with the data + data (str): the python key or value that should be serialized + + Returns: + bytes serialized using the configured encoding codec + """ + return codecs.encode(data, self.encoding) + + def deserialize(self, topic, data): + """ + Parameters: + topic (str): topic associated with the data + data (bytes): serialized bytes as received from kafka + + Returns: + a string, deserialized using the configured encoding codec + """ + return codecs.decode(data, self.encoding) From 2defc652a628ad7e7ba8b9e25316470f1c520ef9 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 2 Mar 2015 09:42:51 -0800 Subject: [PATCH 02/11] New high-level, asynchronous kafka producer The interface of this producer class is modeled after the new upstream java client in version 0.8.2: http://kafka.apache.org/documentation.html#producerapi KafkaProducer is configured via keyword arguments to the __init__() constructor. The configuration keywords generally follow the upstream java client settings. Messages are produced with KafkaProducer.send(), which places the messages on an internal FIFO queue. Messages are consumed from the queue asynchronously by a background worker thread. --- kafka/__init__.py | 4 +- kafka/producer/__init__.py | 3 +- kafka/producer/kafka.py | 478 +++++++++++++++++++++++++++++++++++++ 3 files changed, 482 insertions(+), 3 deletions(-) create mode 100644 kafka/producer/kafka.py diff --git a/kafka/__init__.py b/kafka/__init__.py index 3536084aa..fd9fbfe2b 100644 --- a/kafka/__init__.py +++ b/kafka/__init__.py @@ -11,7 +11,7 @@ from kafka.protocol import ( create_message, create_gzip_message, create_snappy_message ) -from kafka.producer import SimpleProducer, KeyedProducer +from kafka.producer import SimpleProducer, KeyedProducer, KafkaProducer from kafka.partitioner import RoundRobinPartitioner, HashedPartitioner from kafka.consumer import SimpleConsumer, MultiProcessConsumer, KafkaConsumer @@ -19,5 +19,5 @@ 'KafkaClient', 'KafkaConnection', 'SimpleProducer', 'KeyedProducer', 'RoundRobinPartitioner', 'HashedPartitioner', 'SimpleConsumer', 'MultiProcessConsumer', 'create_message', 'create_gzip_message', - 'create_snappy_message', 'KafkaConsumer', + 'create_snappy_message', 'KafkaConsumer', 'KafkaProducer' ] diff --git a/kafka/producer/__init__.py b/kafka/producer/__init__.py index bc0e7c61f..4d8d878fa 100644 --- a/kafka/producer/__init__.py +++ b/kafka/producer/__init__.py @@ -1,6 +1,7 @@ from .simple import SimpleProducer from .keyed import KeyedProducer +from .kafka import KafkaProducer __all__ = [ - 'SimpleProducer', 'KeyedProducer' + 'SimpleProducer', 'KeyedProducer', 'KafkaProducer' ] diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py new file mode 100644 index 000000000..c12fd2435 --- /dev/null +++ b/kafka/producer/kafka.py @@ -0,0 +1,478 @@ +from __future__ import absolute_import + +from collections import namedtuple +import logging +from Queue import Empty, Full, Queue +import random +import threading +import time + +import six + +from kafka.client import KafkaClient +from kafka.common import ( + ProduceRequest, ConnectionError, RequestTimedOutError, + LeaderNotAvailableError, UnknownTopicOrPartitionError, + FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError +) +from kafka.protocol import ( + CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, + create_message_set +) +from kafka.serializer import NoopSerializer +from kafka.util import kafka_bytestring + +ProducerRecord = namedtuple( + 'ProducerRecord', + 'topic partition key value callback' +) + +logger = logging.getLogger(__name__) + +DEFAULT_PRODUCER_CONFIG = { + # These configuration names are borrowed directly from + # upstream java kafka producer: + 'bootstrap_servers': [], + 'acks': 1, + 'buffer_memory_messages': 50000, + 'compression_type': None, + 'retries': 0, + 'client_id': __name__, + 'timeout_ms': 30000, + 'block_on_buffer_full': True, + 'metadata_fetch_timeout_ms': 60000, + 'metadata_max_age_ms': 300000, + 'retry_backoff_ms': 100, + 'key_serializer': NoopSerializer(is_key=True), + 'value_serializer': NoopSerializer(is_key=False), + + # currently unused upstream configs + 'batch_size_bytes': 16384, + 'linger_ms': 0, + 'max_request_size_bytes': 1048576, + 'receive_buffer_bytes': 32768, + 'send_buffer_bytes': 131072, + 'metric_reporters': [], + 'metrics_num_samples': 2, + 'metrics_sample_window_ms': 30000, + 'reconnect_backoff_ms': 10, + + # These configuration parameters are kafka-python specific -- not borrowed + 'socket_timeout_ms': 30 * 1000, + 'producer_loop_idle_wait_ms': 1000, +} + +class KafkaProducer(object): + """High-level, asynchronous kafka producer + + This producer has a simple interface modeled after the new upstream + java client in version 0.8.2 + http://kafka.apache.org/documentation.html#producerapi + + The primary interface is KafkaProducer.send(), which places messages on an + internal FIFO queue. Messages are consumed from the queue asynchronously by + a background worker thread. + + """ + def __init__(self, **configs): + """ + KafkaProducer instance constructor takes keyword arguments as + configuration. The configuration keywords generally follow the upstream + java client settings. + + Keyword Arguments: + bootstrap_servers (list of str): list of kafka brokers to use for + bootstrapping initial cluster metadata. broker strings should + be formatted as `host:port` + acks (int): number of acks required for each produce request + defaults to 1. See Kafka Protocol documentation for more + information. + buffer_memory_messages (int): number of unsent messages to buffer + internally before send() will block or raise. defaults to 50000 + compression_type (str): type of compression to apply to messages. + options are 'none', 'gzip', or 'snappy'; defaults to 'none' + retries (int): Number of times to retry a failed produce request. + defaults to 0. + client_id (str): a unique string identifier for this client, sent + to the kafka cluster for bookkeeping. Defaults to the module + name ('kafka.producer.kafka'). + timeout_ms (int): milliseconds to wait for the kafka broker to get + the required number of `acks`. defaults to 30000. + block_on_buffer_full (bool): configure whether send() should block + when there is insufficient buffer_memory to handle the next + message, or instead raise a Queue.Full exception. + defaults to True. + metadata_fetch_timeout_ms (int): milliseconds to wait before raising + when refreshing cluster metadata. defaults to 60000. + metadata_max_age_ms (int): milliseconds to retain cluster metadata + before requiring a refresh. (cluster metadata will also be + refreshed when a server error is received suggesting that the + metadata has changed). defaults to 300000. + retry_backoff_ms (int): milliseconds to wait between retries. only + used for produce requests if `retries` > 0, but also applies to + cluster metadata request retries. defaults to 100. + key_serializer (Serializer): an instance of a Serializer class used + to convert a message key, passed to `send(key=foo)`, into raw + bytes that will be encoded into the kafka message. Defaults to + no serialization [NoopSerializer] + value_serializer (Serializer): an instance of a Serializer class + used to convert a message value into raw bytes that will be + encoded into the kafka message. Defaults to no serialization + [NoopSerializer] + socket_timeout_ms (int): client socket TCP timeout, used by + KafkaClient instance. defaults to 30000. + producer_loop_idle_wait_ms (int): milliseconds to sleep when the + internal producer queue is empty. defaults to 1000. + batch_size_bytes (int): currently unused. + linger_ms (int): currently unusued. + max_request_size_bytes (int): currently unused. + receive_buffer_bytes (int): currently unused. + send_buffer_bytes (int): currently unused. + metric_reporters (list): currently unused. + metrics_num_samples (int): currently unused. + metrics_sample_window_ms (int): currently unused. + reconnect_backoff_ms (int): currently unused. + + See Also + http://kafka.apache.org/documentation.html#newproducerconfigs + http://kafka.apache.org/082/javadoc/org/apache/kafka/clients/producer/ProducerConfig.html + + """ + self._thread_lock = threading.Lock() + self._stop_event = threading.Event() + self._producer_queue = None + self._producer_thread = None + self._local_data = threading.local() + self._client = None + self._configure(**configs) + + def _configure(self, **configs): + self._config = {} + for key in DEFAULT_PRODUCER_CONFIG: + self._config[key] = configs.pop(key, DEFAULT_PRODUCER_CONFIG[key]) + + if configs: + raise KafkaConfigurationError('Unknown configuration key(s): ' + + str(list(configs.keys()))) + + self._client = KafkaClient( + self._config['bootstrap_servers'], + client_id=self._config['client_id'], + timeout=(self._config['socket_timeout_ms'] / 1000.0) + ) + + self._producer_queue = Queue(self._config['buffer_memory_messages']) + + def _start_producer_thread(self): + with self._thread_lock: + if self._producer_thread_is_running(): + logger.warning("Attempted to start async Kafka producer " + "when it was already running.") + return False + + if self._producer_thread: + logger.warning("starting a new _producer_thread... " + "previous one crashed?") + + self._stop_event.clear() + self._producer_thread = threading.Thread(target=self._producer_loop) + self._producer_thread.daemon = True + self._producer_thread.start() + + def _producer_thread_is_running(self): + t = self._producer_thread + return t is not None and t.is_alive() + + def _producer_loop(self): + while not self._stop_event.is_set(): + try: + record = self._producer_queue.get(block=False) + except Empty: + wait_time = self._config['producer_loop_idle_wait_ms'] / 1000.0 + self._stop_event.wait(wait_time) + continue + + self._metadata_max_age_refresh() + produce_request = self._encode_produce_request(record) + + # send produce request to kafka, optionally w/ retries + retries = self._config['retries'] + while not self._stop_event.is_set(): + try: + responses = self._client.send_produce_request( + [produce_request], + acks=self._config['acks'], + timeout=self._config['timeout_ms'] + ) + + # kafka server does not respond if acks == 0 + if self._config['acks'] != 0: + + # Handle Callbacks -- swallow errors + self._handle_callback(record, responses[0]) + + # break marks the while loop as successful + # else block below will not run + break + + # Catch errors that indicate we need to refresh topic metadata + except (LeaderNotAvailableError, UnknownTopicOrPartitionError, + FailedPayloadsError, ConnectionError) as e: + logger.warning("_producer_loop caught exception type %s, " + "refreshing metadata", type(e)) + try: + self.refresh_topic_metadata() + # We re-encode the produce request, in case it was + # affected by changed metadata + produce_request = self._encode_produce_request(record) + except RequestTimedOutError: + self._handle_dropped_message(record) + break + + # If the whole cluster is unavailable, we should backoff + except KafkaUnavailableError: + logger.warning("KafkaUnavailableError error -- " + "will retry after retry_backoff_ms") + + # ProduceRequests set acks and a timeout value to wait for the + # required acks. If the server cant get acks before timeout, we + # get this error and need to decide whether to retry or skip + except RequestTimedOutError: + logger.warning("RequestTimedOut error attempting to produce " + "messages with %s acks required and %s ms " + "timeout" % (self._config['acks'], + self._config['timeout_ms'])) + + # If another exception is raised, there's a problem.... + # Please open an issue on github! + except BaseException: + logger.exception("_producer_loop unrecoverable exception" + " - please report on github!") + raise + + # If we aren't retrying then we log the message + # as (potentially) dropped, + # then break from the inner while loop + # and move to the next message in the queue + if not retries: + self._handle_dropped_message(record) + break + + # Otherwise, backoff before retrying + self._stop_event.wait(self._config['retry_backoff_ms'] / 1000.0) + retries -= 1 + + # If the loop did not successfully break before _stop_event was set + # we need to log the message as failed + # messages still on the queue will be handled separately + else: + self._handle_dropped_message(record) + + # Whether the message delivery was successful or not, + # we ack it in the queue to allow others to join() the queue + self._producer_queue.task_done() + + def _get_partition_for_record(self, record): + if record.partition is not None: + return record.partition + + else: + partitions = self.partitions_for_topic(record.topic) + if not partitions: + self.refresh_topic_metadata() + partitions = self.partitions_for_topic(record.topic) + if not partitions: raise UnknownTopicOrPartitionError(record.topic) + + size = len(partitions) + + # hash the index + if record.key is not None: + idx = hash(key) % size + + # or round-robin it, if no key available + else: + try: + indices = self._local_data.partition_indices + except AttributeError: + indices = dict() + self._local_data.partition_indices = indices + + idx = indices.get(record.topic) + + # if this is the first record of the round-robin + # we select a random index in order to smooth + # message distribution when there are many + # short-lived producer instances + if idx is None: + idx = random.randint(0, size - 1) + else: + idx = (idx + 1) % size + indices[record.topic] = idx + + partition = partitions[idx] + + def _encode_produce_request(self, record): + value_serializer = self._config['value_serializer'] + key_serializer = self._config['key_serializer'] + codec = self._get_codec() + produce_request = ProduceRequest( + kafka_bytestring(record.topic), + self._get_partition_for_record(record), + create_message_set( + [value_serializer.serialize(record.topic, record.value)], + codec, + key_serializer.serialize(record.topic, record.key) + ) + ) + return produce_request + + def _handle_dropped_message(self, record): + serializer = self._config['value_serializer'] + logger.error('DROPPED MESSAGE: %s', + serializer.serialize(record.topic, record.value)) + + def _handle_callback(self, record, response): + """ + Private method to handle message callbacks + """ + # If no callback registered, do nothing + if not record.callback: + return + + # wrap callback in blanket try / except to avoid crashing the thread + try: + record.callback.__call__(record, response) + except: + logger.exception('Caught exception during callback (ignoring)') + + def send(self, topic=None, partition=None, key=None, value=None, callback=None): + """Send a message to a kafka topic + + Messages are sent asynchronously via a background worker thread + optionally specify specific partition and/or key and/or callback + + Keyword Arguments: + topic (str): the topic to which the message should be sent. + partition (int, optional): a specific partition to which the message + should be routed. if no partition is specified for a record, + the partition will be determined by hashing the record key, or, + if no key is provided, by round-robin across all partitions for + the topic. + key (any, optional): a key to use for partitioning. keys are + serialized with the configured `key_serializer` and are stored + along with the value in encoded kafka message. + value (any): the value to put in the kafka message. values are + serialized with the configured `value_serializer`. + callback (callable, optional): a callback function that will be + called asynchronously for each produced message with the + partition and offset the message was routed to. the callback + signature should be `callback(record, response)`. Note that the + callback will not be called if the configured `acks` is 0. + + Returns: + Nothing, KafkaProducer always sends messages asynchronously + + """ + if not self._producer_thread_is_running(): + self._start_producer_thread() + + record = ProducerRecord(topic, partition, key, value, callback) + self._producer_queue.put(record, block=self._config['block_on_buffer_full']) + return None + + def partitions_for_topic(self, topic): + """Get a list of partition ids for a topic + + The method relies on cached cluster metadata and may return + stale values if the cluster state has changed. Metadata will + be automatically refreshed if the producer detects leadership changes + or other server errors suggesting the cached cluster state is out of + date. Configure `metadata_max_age_ms` to adjust how long cached + metadata is kept before refreshing without any detected changes. + + Parameters: + topic (str): the topic name + + Returns: + a list of partition ids (ints) that are available for the topic + based on the last fetched cluster metadata + + """ + return self._client.get_partition_ids_for_topic(topic) + + def refresh_topic_metadata(self): + """Refresh kafka broker / topic metadata + + Will retry until _stop_event is set or `metadata_fetch_timeout_ms` + elapses. Retries use the `retry_backoff_ms` configuration + + Catches KafkaUnavailableError, but any other exception will raise + (and crash the thread!). + + Returns: + True if successful, otherwise False + + """ + backoff = self._config['retry_backoff_ms'] / 1000.0 + timeout = self._config['metadata_fetch_timeout_ms'] / 1000.0 + start = time.time() + while not self._stop_event.is_set() and time.time() < (start + timeout): + try: + self._client.load_metadata_for_topics() + self._local_data.last_metadata_refresh = time.time() + return True + except KafkaUnavailableError: + logger.warning("KafkaUnavailableError attempting " + "to refresh topic metadata") + self._stop_event.wait(backoff) + + if time.time() >= (start + timeout): + raise RequestTimedOutError() + else: + return False + + def _metadata_max_age_refresh(self): + try: + last_refresh = self._local_data.last_metadata_refresh + except AttributeError: + return True + + refresh_interval = self._config['metadata_max_age_ms'] / 1000.0 + + if time.time() >= last_refresh + refresh_interval: + return self.refresh_topic_metadata() + return True + + def _get_codec(self): + compression = self._config['compression_type'] + if compression is None or compression == 'none': + return CODEC_NONE + elif compression == 'gzip': + return CODEC_GZIP + elif compression == 'snappy': + return CODEC_SNAPPY + raise KafkaConfigurationError('Unknown compression type: %s' % + compression) + + def close(self): + """Shutdown the KafkaProducer instance""" + with self._thread_lock: + if self._producer_thread_is_running(): + self._stop_event.set() + self._producer_thread.join() + self._producer_thread = None + else: + logger.warning("producer thread not running") + + if not self._producer_queue.empty(): + logger.warning("producer thread stopped " + "with %d unsent messages", + self._producer_queue.qsize()) + while self._producer_queue.qsize() > 0: + self._handle_dropped_message(self._producer_queue.get()) + self._producer_queue.task_done() + + return True + + def metrics(self): + raise NotImplemented() From 62f8070ba40e230c28f9afcd925b205fc8c7dbea Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 2 Mar 2015 09:47:02 -0800 Subject: [PATCH 03/11] Add some initial KafkaProducer unit and integration tests --- test/test_producer.py | 55 +++++++++++++++++++++++++++++-- test/test_producer_integration.py | 43 +++++++++++++++++++++++- 2 files changed, 95 insertions(+), 3 deletions(-) diff --git a/test/test_producer.py b/test/test_producer.py index f6b3d6a1b..d8985fbd5 100644 --- a/test/test_producer.py +++ b/test/test_producer.py @@ -2,13 +2,15 @@ import logging -from mock import MagicMock +from mock import MagicMock, Mock, patch from . import unittest +from kafka.common import ProduceResponse from kafka.producer.base import Producer +from kafka.producer.kafka import KafkaProducer -class TestKafkaProducer(unittest.TestCase): +class TestBaseProducer(unittest.TestCase): def test_producer_message_types(self): producer = Producer(MagicMock()) @@ -40,3 +42,52 @@ def partitions(topic): topic = b"test-topic" producer.send_messages(topic, b'hi') assert client.send_produce_request.called + +class TestKafkaProducer(unittest.TestCase): + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_kafka_producer_client_configs(self, protocol, conn): + conn.recv.return_value = 'response' # anything but None + + producer = KafkaProducer( + bootstrap_servers=['foobar:1234', 'foobaz:4321'], + client_id='barbaz', + socket_timeout_ms=1234 + ) + self.assertEquals(producer._client.client_id, b'barbaz') + self.assertEquals(producer._client.timeout, 1.234) + self.assertEquals(sorted(producer._client.hosts), + [('foobar', 1234), ('foobaz', 4321)]) + + @patch('kafka.client.KafkaConnection') + @patch('kafka.client.KafkaProtocol') + def test_kafka_producer_callback(self, protocol, conn): + conn.recv.return_value = 'response' # anything but None + + producer = KafkaProducer( + bootstrap_servers=['foobar:1234', 'foobaz:4321'], + client_id='barbaz', + retries=5 + ) + producer.partitions_for_topic = lambda _: [0, 1] + producer._client._get_leader_for_partition = MagicMock() + producer._client.send_produce_request = MagicMock() + producer._client.send_produce_request.return_value = [ + ProduceResponse('test-topic', 11, 0, 30) + ] + + mock = Mock() + producer.send(topic='test-topic', value=u'你怎么样?', callback=mock.callback) + producer._producer_queue.join() + producer.close() + + self.assertTrue(mock.callback.called) + self.assertEquals(mock.callback.call_count, 1) + record = mock.callback.call_args[0][0] + response = mock.callback.call_args[0][1] + self.assertEquals(record.topic, 'test-topic') + self.assertEquals(record.value, u'你怎么样?') + self.assertEquals(response.topic, 'test-topic') + self.assertEquals(response.partition, 11) + self.assertEquals(response.offset, 30) diff --git a/test/test_producer_integration.py b/test/test_producer_integration.py index 38df69f2c..ad9da04e7 100644 --- a/test/test_producer_integration.py +++ b/test/test_producer_integration.py @@ -2,10 +2,11 @@ import time import uuid +from mock import MagicMock from six.moves import range from kafka import ( - SimpleProducer, KeyedProducer, + SimpleProducer, KeyedProducer, KafkaProducer, create_message, create_gzip_message, create_snappy_message, RoundRobinPartitioner, HashedPartitioner ) @@ -451,6 +452,46 @@ def test_acks_cluster_commit(self): producer.stop() + @kafka_versions("all") + def test_async_kafka_producer(self): + start_offset0 = self.current_offset(self.topic, 0) + + brokers = '%s:%d' % (self.server.host, self.server.port) + producer = KafkaProducer(bootstrap_servers=brokers, + producer_loop_idle_wait_ms=100) + + fake_callback = MagicMock() + resp = producer.send( + topic=self.topic, + partition=0, + key=self.key("key1"), + value=self.msg("one"), + callback=fake_callback + ) + + # response should be None + self.assertIsNone(resp) + + # Sleep for 1 producer loop idle + time.sleep(0.1) + self.assert_fetch_offset(0, start_offset0, [ self.msg("one") ]) + + # Verify callback worked + self.assertTrue(fake_callback.called) + self.assertEqual(fake_callback.call_count, 1) + + args, kwargs = fake_callback.call_args + self.assertEqual(len(args), 2) + + record, response = args + self.assertEqual(record.topic, self.topic) + self.assertEqual(record.partition, 0) + self.assertEqual(record.key, self.key("key1")) + self.assertEqual(record.value, self.msg("one")) + self.assertEqual(response.offset, start_offset0) + + producer.close() + def assert_produce_request(self, messages, initial_offset, message_ct, partition=0): produce = ProduceRequest(self.topic, partition, messages=messages) From e46dc49b4ac3dd121f6d3de4bcf32363b2a43d51 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 2 Mar 2015 17:39:09 -0800 Subject: [PATCH 04/11] Fix kafka.producer.kafka lint errors --- kafka/producer/kafka.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index c12fd2435..a869ba7e8 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -287,7 +287,7 @@ def _get_partition_for_record(self, record): # hash the index if record.key is not None: - idx = hash(key) % size + idx = hash(record.key) % size # or round-robin it, if no key available else: @@ -475,4 +475,4 @@ def close(self): return True def metrics(self): - raise NotImplemented() + raise NotImplementedError() From 6c4eb8954eb99005b7b89ff9ca4ef4d5fbc9264a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 2 Mar 2015 17:43:57 -0800 Subject: [PATCH 05/11] Fix Queue/queue import for python 3 --- kafka/producer/kafka.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index a869ba7e8..69af13806 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -2,7 +2,10 @@ from collections import namedtuple import logging -from Queue import Empty, Full, Queue +try: + from Queue import Empty, Full, Queue +except ImportError: # python 3 + from queue import Empty, Full, Queue import random import threading import time From 04fb3c8cc670a79078812a035d654a63cd60e0ef Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Mon, 2 Mar 2015 17:51:27 -0800 Subject: [PATCH 06/11] Automatically shutdown kafka producer thread via __del__ -- see PR 324 --- kafka/producer/kafka.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 69af13806..30c2c7e84 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -477,5 +477,8 @@ def close(self): return True + def __del__(self): + self.close() + def metrics(self): raise NotImplementedError() From 8647b0435cbf0c253e2629ac72501a20b2c92d0a Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Mar 2015 17:40:58 -0700 Subject: [PATCH 07/11] Catch NotLeaderForPartitionError in KafkaProducer; add comments explaining each exception we handle --- kafka/producer/kafka.py | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 30c2c7e84..0fb48837f 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -15,8 +15,9 @@ from kafka.client import KafkaClient from kafka.common import ( ProduceRequest, ConnectionError, RequestTimedOutError, - LeaderNotAvailableError, UnknownTopicOrPartitionError, - FailedPayloadsError, KafkaUnavailableError, KafkaConfigurationError + NotLeaderForPartitionError, LeaderNotAvailableError, + UnknownTopicOrPartitionError, FailedPayloadsError, KafkaUnavailableError, + KafkaConfigurationError ) from kafka.protocol import ( CODEC_NONE, CODEC_GZIP, CODEC_SNAPPY, @@ -219,8 +220,26 @@ def _producer_loop(self): break # Catch errors that indicate we need to refresh topic metadata - except (LeaderNotAvailableError, UnknownTopicOrPartitionError, - FailedPayloadsError, ConnectionError) as e: + except ( + # We sent the produce request to a non-leader broker + NotLeaderForPartitionError, + + # Leader-election is underway -- topic has no leader yet + LeaderNotAvailableError, + + # We sent the produce request to a non-replica broker + # this also occurs when the topic does not exist at all + # and auto-topic-creation is not enabled + UnknownTopicOrPartitionError, + + # Network connection error -- broker may be offline + ConnectionError, + + # Internal kafka-python error + # currently means there was a ConnectionError + # FailedPayloadsError needs a refactor... + FailedPayloadsError + ) as e: logger.warning("_producer_loop caught exception type %s, " "refreshing metadata", type(e)) try: From 4714fd868a00fd17911230bf80b236cbac5f3707 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Mar 2015 18:09:05 -0700 Subject: [PATCH 08/11] Remove unused imports from kafka.producer.kafka --- kafka/producer/kafka.py | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 0fb48837f..759dd9ebf 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -3,15 +3,13 @@ from collections import namedtuple import logging try: - from Queue import Empty, Full, Queue + from Queue import Empty, Queue except ImportError: # python 3 - from queue import Empty, Full, Queue + from queue import Empty, Queue import random import threading import time -import six - from kafka.client import KafkaClient from kafka.common import ( ProduceRequest, ConnectionError, RequestTimedOutError, From e08581ffe56a73a4ad57a285b7af6e7a9806f2f2 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Mar 2015 18:16:08 -0700 Subject: [PATCH 09/11] Fix kafka producer _get_partition_for_record --- kafka/producer/kafka.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 759dd9ebf..f5c6c425f 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -329,7 +329,7 @@ def _get_partition_for_record(self, record): idx = (idx + 1) % size indices[record.topic] = idx - partition = partitions[idx] + return partitions[idx] def _encode_produce_request(self, record): value_serializer = self._config['value_serializer'] From 5570c10a4cb59f0a8a0a762a8278990f38889d59 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Mar 2015 18:17:58 -0700 Subject: [PATCH 10/11] Deprecate InvalidFetchRequestError -- kafka protocol now calls it InvalidMessageSize --- kafka/common.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/kafka/common.py b/kafka/common.py index b7bb06c92..6012adfe2 100644 --- a/kafka/common.py +++ b/kafka/common.py @@ -102,9 +102,10 @@ class UnknownTopicOrPartitionError(BrokerResponseError): message = 'UNKNOWN_TOPIC_OR_PARTITON' -class InvalidFetchRequestError(BrokerResponseError): +# Formerly known (incorrectly) as InvalidFetchRequestError +class InvalidMessageSizeError(BrokerResponseError): errno = 4 - message = 'INVALID_FETCH_SIZE' + message = 'INVALID_MESSAGE_SIZE' class LeaderNotAvailableError(BrokerResponseError): From 7acbf8c7bcb1d634b03c786707de540c57460932 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 8 Mar 2015 18:33:00 -0700 Subject: [PATCH 11/11] KafkaProducer: Dont try to retry on message errors --- kafka/producer/kafka.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index f5c6c425f..761439d4b 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -15,6 +15,7 @@ ProduceRequest, ConnectionError, RequestTimedOutError, NotLeaderForPartitionError, LeaderNotAvailableError, UnknownTopicOrPartitionError, FailedPayloadsError, KafkaUnavailableError, + InvalidMessageError, InvalidMessageSizeError, MessageSizeTooLargeError, KafkaConfigurationError ) from kafka.protocol import ( @@ -249,6 +250,22 @@ def _producer_loop(self): self._handle_dropped_message(record) break + # Message errors should not be retried + except ( + # Message contents does not match its CRC + InvalidMessageError, + + # Message has a negative size + InvalidMessageSizeError, + + # Message larger than server configured maximum message size + MessageSizeTooLargeError + + ) as e: + retries = 0 + logger.error("%s error -- dropping message without retry", + type(e)) + # If the whole cluster is unavailable, we should backoff except KafkaUnavailableError: logger.warning("KafkaUnavailableError error -- "