diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 2a70700c4..f7e2b5fa4 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -1,4 +1,4 @@ -from __future__ import absolute_import +from __future__ import absolute_import, division import atexit import copy @@ -538,7 +538,7 @@ def close(self, timeout=None): def partitions_for(self, topic): """Returns set of all known partitions for the topic.""" - max_wait = self.config['max_block_ms'] / 1000.0 + max_wait = self.config['max_block_ms'] / 1000 return self._wait_on_metadata(topic, max_wait) def _max_usable_produce_magic(self): @@ -596,19 +596,29 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest assert not (value is None and key is None), 'Need at least one: key or value' key_bytes = value_bytes = None try: - self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0) - - key_bytes = self._serialize( - self.config['key_serializer'], - topic, key) - value_bytes = self._serialize( - self.config['value_serializer'], - topic, value) - assert type(key_bytes) in (bytes, bytearray, memoryview, type(None)) - assert type(value_bytes) in (bytes, bytearray, memoryview, type(None)) - - partition = self._partition(topic, partition, key, value, - key_bytes, value_bytes) + assigned_partition = None + elapsed = 0.0 + begin = time.time() + timeout = self.config['max_block_ms'] / 1000 + while assigned_partition is None and elapsed < timeout: + elapsed = time.time() - begin + self._wait_on_metadata(topic, timeout - elapsed) + + key_bytes = self._serialize( + self.config['key_serializer'], + topic, key) + value_bytes = self._serialize( + self.config['value_serializer'], + topic, value) + assert type(key_bytes) in (bytes, bytearray, memoryview, type(None)) + assert type(value_bytes) in (bytes, bytearray, memoryview, type(None)) + + assigned_partition = self._partition(topic, partition, key, value, + key_bytes, value_bytes) + if assigned_partition is None: + raise Errors.KafkaTimeoutError("Failed to assign partition for message after %s secs." % timeout) + else: + partition = assigned_partition if headers is None: headers = [] @@ -710,6 +720,10 @@ def _wait_on_metadata(self, topic, max_wait): if partitions is not None: return partitions + if elapsed >= max_wait: + raise Errors.KafkaTimeoutError( + "Failed to update metadata after %.1f secs." % (max_wait,)) + if not metadata_event: metadata_event = threading.Event() @@ -720,13 +734,13 @@ def _wait_on_metadata(self, topic, max_wait): future.add_both(lambda e, *args: e.set(), metadata_event) self._sender.wakeup() metadata_event.wait(max_wait - elapsed) - elapsed = time.time() - begin if not metadata_event.is_set(): raise Errors.KafkaTimeoutError( "Failed to update metadata after %.1f secs." % (max_wait,)) elif topic in self._metadata.unauthorized_topics: raise Errors.TopicAuthorizationFailedError(topic) else: + elapsed = time.time() - begin log.debug("_wait_on_metadata woke after %s secs.", elapsed) def _serialize(self, f, topic, data): @@ -738,16 +752,18 @@ def _serialize(self, f, topic, data): def _partition(self, topic, partition, key, value, serialized_key, serialized_value): + all_partitions = self._metadata.partitions_for_topic(topic) + available = self._metadata.available_partitions_for_topic(topic) + if all_partitions is None or available is None: + return None if partition is not None: assert partition >= 0 - assert partition in self._metadata.partitions_for_topic(topic), 'Unrecognized partition' + assert partition in all_partitions, 'Unrecognized partition' return partition - all_partitions = sorted(self._metadata.partitions_for_topic(topic)) - available = list(self._metadata.available_partitions_for_topic(topic)) return self.config['partitioner'](serialized_key, - all_partitions, - available) + sorted(all_partitions), + list(available)) def metrics(self, raw=False): """Get metrics on producer performance.