Skip to content

Attempt to fix metadata race condition when partitioning in producer.send #2523

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Mar 12, 2025
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
58 changes: 37 additions & 21 deletions kafka/producer/kafka.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import absolute_import
from __future__ import absolute_import, division

import atexit
import copy
Expand Down Expand Up @@ -538,7 +538,7 @@ def close(self, timeout=None):

def partitions_for(self, topic):
"""Returns set of all known partitions for the topic."""
max_wait = self.config['max_block_ms'] / 1000.0
max_wait = self.config['max_block_ms'] / 1000
return self._wait_on_metadata(topic, max_wait)

def _max_usable_produce_magic(self):
Expand Down Expand Up @@ -596,19 +596,29 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
assert not (value is None and key is None), 'Need at least one: key or value'
key_bytes = value_bytes = None
try:
self._wait_on_metadata(topic, self.config['max_block_ms'] / 1000.0)

key_bytes = self._serialize(
self.config['key_serializer'],
topic, key)
value_bytes = self._serialize(
self.config['value_serializer'],
topic, value)
assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))

partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
assigned_partition = None
elapsed = 0.0
begin = time.time()
timeout = self.config['max_block_ms'] / 1000
while assigned_partition is None and elapsed < timeout:
elapsed = time.time() - begin
self._wait_on_metadata(topic, timeout - elapsed)

key_bytes = self._serialize(
self.config['key_serializer'],
topic, key)
value_bytes = self._serialize(
self.config['value_serializer'],
topic, value)
assert type(key_bytes) in (bytes, bytearray, memoryview, type(None))
assert type(value_bytes) in (bytes, bytearray, memoryview, type(None))

assigned_partition = self._partition(topic, partition, key, value,
key_bytes, value_bytes)
if assigned_partition is None:
raise Errors.KafkaTimeoutError("Failed to assign partition for message after %s secs." % timeout)
else:
partition = assigned_partition

if headers is None:
headers = []
Expand Down Expand Up @@ -710,6 +720,10 @@ def _wait_on_metadata(self, topic, max_wait):
if partitions is not None:
return partitions

if elapsed >= max_wait:
raise Errors.KafkaTimeoutError(
"Failed to update metadata after %.1f secs." % (max_wait,))

if not metadata_event:
metadata_event = threading.Event()

Expand All @@ -720,13 +734,13 @@ def _wait_on_metadata(self, topic, max_wait):
future.add_both(lambda e, *args: e.set(), metadata_event)
self._sender.wakeup()
metadata_event.wait(max_wait - elapsed)
elapsed = time.time() - begin
if not metadata_event.is_set():
raise Errors.KafkaTimeoutError(
"Failed to update metadata after %.1f secs." % (max_wait,))
elif topic in self._metadata.unauthorized_topics:
raise Errors.TopicAuthorizationFailedError(topic)
else:
elapsed = time.time() - begin
log.debug("_wait_on_metadata woke after %s secs.", elapsed)

def _serialize(self, f, topic, data):
Expand All @@ -738,16 +752,18 @@ def _serialize(self, f, topic, data):

def _partition(self, topic, partition, key, value,
serialized_key, serialized_value):
all_partitions = self._metadata.partitions_for_topic(topic)
available = self._metadata.available_partitions_for_topic(topic)
if all_partitions is None or available is None:
return None
if partition is not None:
assert partition >= 0
assert partition in self._metadata.partitions_for_topic(topic), 'Unrecognized partition'
assert partition in all_partitions, 'Unrecognized partition'
return partition

all_partitions = sorted(self._metadata.partitions_for_topic(topic))
available = list(self._metadata.available_partitions_for_topic(topic))
return self.config['partitioner'](serialized_key,
all_partitions,
available)
sorted(all_partitions),
list(available))

def metrics(self, raw=False):
"""Get metrics on producer performance.
Expand Down