Skip to content

Correct message keys for async batching mode #326

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions kafka/producer/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
# Adjust the timeout to match the remaining period
count -= 1
timeout = send_at - time.time()
msgset[topic_partition].append(msg)
msgset[topic_partition].append((msg, key))

# Send collected requests upstream
reqs = []
Expand Down Expand Up @@ -191,7 +191,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
self.queue.put((TopicAndPartition(topic, partition), m, key))
resp = []
else:
messages = create_message_set(msg, self.codec, key)
messages = create_message_set([(m, key) for m in msg], self.codec, key)
req = ProduceRequest(topic, partition, messages)
try:
resp = self.client.send_produce_request([req], acks=self.req_acks,
Expand Down
6 changes: 3 additions & 3 deletions kafka/protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -559,7 +559,7 @@ def create_gzip_message(payloads, key=None):

"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, key) for payload in payloads])
[create_message(payload, pl_key) for payload, pl_key in payloads])

gzipped = gzip_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
Expand All @@ -580,7 +580,7 @@ def create_snappy_message(payloads, key=None):

"""
message_set = KafkaProtocol._encode_message_set(
[create_message(payload, key) for payload in payloads])
[create_message(payload, pl_key) for payload, pl_key in payloads])

snapped = snappy_encode(message_set)
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
Expand All @@ -595,7 +595,7 @@ def create_message_set(messages, codec=CODEC_NONE, key=None):
return a list containing a single codec-encoded message.
"""
if codec == CODEC_NONE:
return [create_message(m, key) for m in messages]
return [create_message(m, k) for m, k in messages]
elif codec == CODEC_GZIP:
return [create_gzip_message(messages, key)]
elif codec == CODEC_SNAPPY:
Expand Down
16 changes: 8 additions & 8 deletions test/test_producer_integration.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,9 +71,9 @@ def test_produce_many_gzip(self):
start_offset = self.current_offset(self.topic, 0)

message1 = create_gzip_message([
("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)])
(("Gzipped 1 %d" % i).encode('utf-8'), None) for i in range(100)])
message2 = create_gzip_message([
("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)])
(("Gzipped 2 %d" % i).encode('utf-8'), None) for i in range(100)])

self.assert_produce_request(
[ message1, message2 ],
Expand All @@ -87,8 +87,8 @@ def test_produce_many_snappy(self):
start_offset = self.current_offset(self.topic, 0)

self.assert_produce_request([
create_snappy_message(["Snappy 1 %d" % i for i in range(100)]),
create_snappy_message(["Snappy 2 %d" % i for i in range(100)]),
create_snappy_message([("Snappy 1 %d" % i, None) for i in range(100)]),
create_snappy_message([("Snappy 2 %d" % i, None) for i in range(100)]),
],
start_offset,
200,
Expand All @@ -102,13 +102,13 @@ def test_produce_mixed(self):
messages = [
create_message(b"Just a plain message"),
create_gzip_message([
("Gzipped %d" % i).encode('utf-8') for i in range(100)]),
(("Gzipped %d" % i).encode('utf-8'), None) for i in range(100)]),
]

# All snappy integration tests fail with nosnappyjava
if False and has_snappy():
msg_count += 100
messages.append(create_snappy_message(["Snappy %d" % i for i in range(100)]))
messages.append(create_snappy_message([("Snappy %d" % i, None) for i in range(100)]))

self.assert_produce_request(messages, start_offset, msg_count)

Expand All @@ -118,7 +118,7 @@ def test_produce_100k_gzipped(self):

self.assert_produce_request([
create_gzip_message([
("Gzipped batch 1, message %d" % i).encode('utf-8')
(("Gzipped batch 1, message %d" % i).encode('utf-8'), None)
for i in range(50000)])
],
start_offset,
Expand All @@ -127,7 +127,7 @@ def test_produce_100k_gzipped(self):

self.assert_produce_request([
create_gzip_message([
("Gzipped batch 1, message %d" % i).encode('utf-8')
(("Gzipped batch 1, message %d" % i).encode('utf-8'), None)
for i in range(50000)])
],
start_offset+50000,
Expand Down
66 changes: 63 additions & 3 deletions test/test_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def test_create_message(self):
self.assertEqual(msg.value, payload)

def test_create_gzip(self):
payloads = [b"v1", b"v2"]
payloads = [(b"v1", None), (b"v2", None)]
msg = create_gzip_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
Expand All @@ -59,9 +59,39 @@ def test_create_gzip(self):

self.assertEqual(decoded, expect)

def test_create_gzip_keyed(self):
payloads = [(b"v1", b"k1"), (b"v2", b"k2")]
msg = create_gzip_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
self.assertEqual(msg.key, None)
# Need to decode to check since gzipped payload is non-deterministic
decoded = gzip_decode(msg.value)
expect = b"".join([
struct.pack(">q", 0), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", 1474775406), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
b"k1", # Key
struct.pack(">i", 2), # Length of value
b"v1", # Value

struct.pack(">q", 0), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", -16383415), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
b"k2", # Key
struct.pack(">i", 2), # Length of value
b"v2", # Value
])

self.assertEqual(decoded, expect)

@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_create_snappy(self):
payloads = [b"v1", b"v2"]
payloads = [(b"v1", None), (b"v2", None)]
msg = create_snappy_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
Expand All @@ -87,6 +117,36 @@ def test_create_snappy(self):

self.assertEqual(decoded, expect)

@unittest.skipUnless(has_snappy(), "Snappy not available")
def test_create_snappy_keyed(self):
payloads = [(b"v1", b"k1"), (b"v2", b"k2")]
msg = create_snappy_message(payloads)
self.assertEqual(msg.magic, 0)
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
self.assertEqual(msg.key, None)
decoded = snappy_decode(msg.value)
expect = b"".join([
struct.pack(">q", 0), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", 1474775406), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
b"k1", # Key
struct.pack(">i", 2), # Length of value
b"v1", # Value

struct.pack(">q", 0), # MsgSet Offset
struct.pack(">i", 18), # Msg Size
struct.pack(">i", -16383415), # CRC
struct.pack(">bb", 0, 0), # Magic, flags
struct.pack(">i", 2), # Length of key
b"k2", # Key
struct.pack(">i", 2), # Length of value
b"v2", # Value
])

self.assertEqual(decoded, expect)

def test_encode_message_header(self):
expect = b"".join([
struct.pack(">h", 10), # API Key
Expand Down Expand Up @@ -701,7 +761,7 @@ def mock_create_message_fns(self):
yield

def test_create_message_set(self):
messages = [1, 2, 3]
messages = [(1, "k1"), (2, "k2"), (3, "k3")]

# Default codec is CODEC_NONE. Expect list of regular messages.
expect = [sentinel.message] * len(messages)
Expand Down