Skip to content

Client request response ordering #403

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 10, 2015
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 20 additions & 10 deletions kafka/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
Arguments:

payloads: list of object-like entities with a topic (str) and
partition (int) attribute
partition (int) attribute; payloads with duplicate topic-partitions
are not supported.

encode_fn: a method to encode the list of payloads to a request body,
must accept client_id, correlation_id, and payloads as
Expand All @@ -152,6 +153,10 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):

List of response objects in the same order as the supplied payloads
"""
# encoders / decoders do not maintain ordering currently
# so we need to keep this so we can rebuild order before returning
original_ordering = [(p.topic, p.partition) for p in payloads]

# Group the requests by topic+partition
brokers_for_payloads = []
payloads_by_broker = collections.defaultdict(list)
Expand All @@ -165,7 +170,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):

# For each broker, send the list of request payloads
# and collect the responses and errors
responses_by_broker = collections.defaultdict(list)
responses = {}
broker_failures = []
for broker, payloads in payloads_by_broker.items():
requestId = self._next_id()
Expand All @@ -184,7 +189,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
'to server %s: %s', requestId, broker, e)

for payload in payloads:
responses_by_broker[broker].append(FailedPayloadsError(payload))
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)

# No exception, try to get response
else:
Expand All @@ -196,7 +202,8 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
log.debug('Request %s does not expect a response '
'(skipping conn.recv)', requestId)
for payload in payloads:
responses_by_broker[broker].append(None)
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = None
continue

try:
Expand All @@ -208,12 +215,17 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
requestId, broker, e)

for payload in payloads:
responses_by_broker[broker].append(FailedPayloadsError(payload))
topic_partition = (payload.topic, payload.partition)
responses[topic_partition] = FailedPayloadsError(payload)

else:
_resps = []
for payload_response in decoder_fn(response):
responses_by_broker[broker].append(payload_response)
log.debug('Response %s: %s', requestId, responses_by_broker[broker])
topic_partition = (payload_response.topic,
payload_response.partition)
responses[topic_partition] = payload_response
_resps.append(payload_response)
log.debug('Response %s: %s', requestId, _resps)

# Connection errors generally mean stale metadata
# although sometimes it means incorrect api request
Expand All @@ -223,9 +235,7 @@ def _send_broker_aware_request(self, payloads, encoder_fn, decoder_fn):
self.reset_all_metadata()

# Return responses in the same order as provided
responses_by_payload = [responses_by_broker[broker].pop(0)
for broker in brokers_for_payloads]
return responses_by_payload
return [responses[tp] for tp in original_ordering]

def __repr__(self):
return '<KafkaClient client_id=%s>' % (self.client_id)
Expand Down
32 changes: 31 additions & 1 deletion test/test_client_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@

from kafka.common import (
FetchRequest, OffsetCommitRequest, OffsetFetchRequest,
KafkaTimeoutError
KafkaTimeoutError, ProduceRequest
)
from kafka.protocol import create_message

from test.fixtures import ZookeeperFixture, KafkaFixture
from test.testutil import KafkaIntegrationTestCase, kafka_versions
Expand Down Expand Up @@ -49,6 +50,35 @@ def test_ensure_topic_exists(self):
with self.assertRaises(KafkaTimeoutError):
self.client.ensure_topic_exists(b"this_topic_doesnt_exist", timeout=0)

@kafka_versions('all')
def test_send_produce_request_maintains_request_response_order(self):

self.client.ensure_topic_exists(b'foo', timeout=1)
self.client.ensure_topic_exists(b'bar', timeout=1)

requests = [
ProduceRequest(
b'foo', 0,
[create_message(b'a'), create_message(b'b')]),
ProduceRequest(
b'bar', 1,
[create_message(b'a'), create_message(b'b')]),
ProduceRequest(
b'foo', 1,
[create_message(b'a'), create_message(b'b')]),
ProduceRequest(
b'bar', 0,
[create_message(b'a'), create_message(b'b')]),
]

responses = self.client.send_produce_request(requests)
while len(responses):
request = requests.pop()
response = responses.pop()
self.assertEqual(request.topic, response.topic)
self.assertEqual(request.partition, response.partition)


####################
# Offset Tests #
####################
Expand Down