Skip to content

Commit 1a5cb03

Browse files
committed
Merge pull request #329 from vshlapakov/feature-batch-msg-keys
Correct message keys for async batching mode
2 parents a5b1c8d + 25ad88c commit 1a5cb03

File tree

4 files changed

+76
-16
lines changed

4 files changed

+76
-16
lines changed

kafka/producer/base.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
5858
# Adjust the timeout to match the remaining period
5959
count -= 1
6060
timeout = send_at - time.time()
61-
msgset[topic_partition].append(msg)
61+
msgset[topic_partition].append((msg, key))
6262

6363
# Send collected requests upstream
6464
reqs = []
@@ -192,7 +192,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
192192
self.queue.put((TopicAndPartition(topic, partition), m, key))
193193
resp = []
194194
else:
195-
messages = create_message_set(msg, self.codec, key)
195+
messages = create_message_set([(m, key) for m in msg], self.codec, key)
196196
req = ProduceRequest(topic, partition, messages)
197197
try:
198198
resp = self.client.send_produce_request([req], acks=self.req_acks,

kafka/protocol.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -559,7 +559,7 @@ def create_gzip_message(payloads, key=None):
559559
560560
"""
561561
message_set = KafkaProtocol._encode_message_set(
562-
[create_message(payload, key) for payload in payloads])
562+
[create_message(payload, pl_key) for payload, pl_key in payloads])
563563

564564
gzipped = gzip_encode(message_set)
565565
codec = ATTRIBUTE_CODEC_MASK & CODEC_GZIP
@@ -580,7 +580,7 @@ def create_snappy_message(payloads, key=None):
580580
581581
"""
582582
message_set = KafkaProtocol._encode_message_set(
583-
[create_message(payload, key) for payload in payloads])
583+
[create_message(payload, pl_key) for payload, pl_key in payloads])
584584

585585
snapped = snappy_encode(message_set)
586586
codec = ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY
@@ -595,7 +595,7 @@ def create_message_set(messages, codec=CODEC_NONE, key=None):
595595
return a list containing a single codec-encoded message.
596596
"""
597597
if codec == CODEC_NONE:
598-
return [create_message(m, key) for m in messages]
598+
return [create_message(m, k) for m, k in messages]
599599
elif codec == CODEC_GZIP:
600600
return [create_gzip_message(messages, key)]
601601
elif codec == CODEC_SNAPPY:

test/test_producer_integration.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -71,9 +71,9 @@ def test_produce_many_gzip(self):
7171
start_offset = self.current_offset(self.topic, 0)
7272

7373
message1 = create_gzip_message([
74-
("Gzipped 1 %d" % i).encode('utf-8') for i in range(100)])
74+
(("Gzipped 1 %d" % i).encode('utf-8'), None) for i in range(100)])
7575
message2 = create_gzip_message([
76-
("Gzipped 2 %d" % i).encode('utf-8') for i in range(100)])
76+
(("Gzipped 2 %d" % i).encode('utf-8'), None) for i in range(100)])
7777

7878
self.assert_produce_request(
7979
[ message1, message2 ],
@@ -87,8 +87,8 @@ def test_produce_many_snappy(self):
8787
start_offset = self.current_offset(self.topic, 0)
8888

8989
self.assert_produce_request([
90-
create_snappy_message(["Snappy 1 %d" % i for i in range(100)]),
91-
create_snappy_message(["Snappy 2 %d" % i for i in range(100)]),
90+
create_snappy_message([("Snappy 1 %d" % i, None) for i in range(100)]),
91+
create_snappy_message([("Snappy 2 %d" % i, None) for i in range(100)]),
9292
],
9393
start_offset,
9494
200,
@@ -102,13 +102,13 @@ def test_produce_mixed(self):
102102
messages = [
103103
create_message(b"Just a plain message"),
104104
create_gzip_message([
105-
("Gzipped %d" % i).encode('utf-8') for i in range(100)]),
105+
(("Gzipped %d" % i).encode('utf-8'), None) for i in range(100)]),
106106
]
107107

108108
# All snappy integration tests fail with nosnappyjava
109109
if False and has_snappy():
110110
msg_count += 100
111-
messages.append(create_snappy_message(["Snappy %d" % i for i in range(100)]))
111+
messages.append(create_snappy_message([("Snappy %d" % i, None) for i in range(100)]))
112112

113113
self.assert_produce_request(messages, start_offset, msg_count)
114114

@@ -118,7 +118,7 @@ def test_produce_100k_gzipped(self):
118118

119119
self.assert_produce_request([
120120
create_gzip_message([
121-
("Gzipped batch 1, message %d" % i).encode('utf-8')
121+
(("Gzipped batch 1, message %d" % i).encode('utf-8'), None)
122122
for i in range(50000)])
123123
],
124124
start_offset,
@@ -127,7 +127,7 @@ def test_produce_100k_gzipped(self):
127127

128128
self.assert_produce_request([
129129
create_gzip_message([
130-
("Gzipped batch 1, message %d" % i).encode('utf-8')
130+
(("Gzipped batch 1, message %d" % i).encode('utf-8'), None)
131131
for i in range(50000)])
132132
],
133133
start_offset+50000,

test/test_protocol.py

Lines changed: 63 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ def test_create_message(self):
3232
self.assertEqual(msg.value, payload)
3333

3434
def test_create_gzip(self):
35-
payloads = [b"v1", b"v2"]
35+
payloads = [(b"v1", None), (b"v2", None)]
3636
msg = create_gzip_message(payloads)
3737
self.assertEqual(msg.magic, 0)
3838
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
@@ -59,9 +59,39 @@ def test_create_gzip(self):
5959

6060
self.assertEqual(decoded, expect)
6161

62+
def test_create_gzip_keyed(self):
63+
payloads = [(b"v1", b"k1"), (b"v2", b"k2")]
64+
msg = create_gzip_message(payloads)
65+
self.assertEqual(msg.magic, 0)
66+
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_GZIP)
67+
self.assertEqual(msg.key, None)
68+
# Need to decode to check since gzipped payload is non-deterministic
69+
decoded = gzip_decode(msg.value)
70+
expect = b"".join([
71+
struct.pack(">q", 0), # MsgSet Offset
72+
struct.pack(">i", 18), # Msg Size
73+
struct.pack(">i", 1474775406), # CRC
74+
struct.pack(">bb", 0, 0), # Magic, flags
75+
struct.pack(">i", 2), # Length of key
76+
b"k1", # Key
77+
struct.pack(">i", 2), # Length of value
78+
b"v1", # Value
79+
80+
struct.pack(">q", 0), # MsgSet Offset
81+
struct.pack(">i", 18), # Msg Size
82+
struct.pack(">i", -16383415), # CRC
83+
struct.pack(">bb", 0, 0), # Magic, flags
84+
struct.pack(">i", 2), # Length of key
85+
b"k2", # Key
86+
struct.pack(">i", 2), # Length of value
87+
b"v2", # Value
88+
])
89+
90+
self.assertEqual(decoded, expect)
91+
6292
@unittest.skipUnless(has_snappy(), "Snappy not available")
6393
def test_create_snappy(self):
64-
payloads = [b"v1", b"v2"]
94+
payloads = [(b"v1", None), (b"v2", None)]
6595
msg = create_snappy_message(payloads)
6696
self.assertEqual(msg.magic, 0)
6797
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
@@ -87,6 +117,36 @@ def test_create_snappy(self):
87117

88118
self.assertEqual(decoded, expect)
89119

120+
@unittest.skipUnless(has_snappy(), "Snappy not available")
121+
def test_create_snappy_keyed(self):
122+
payloads = [(b"v1", b"k1"), (b"v2", b"k2")]
123+
msg = create_snappy_message(payloads)
124+
self.assertEqual(msg.magic, 0)
125+
self.assertEqual(msg.attributes, ATTRIBUTE_CODEC_MASK & CODEC_SNAPPY)
126+
self.assertEqual(msg.key, None)
127+
decoded = snappy_decode(msg.value)
128+
expect = b"".join([
129+
struct.pack(">q", 0), # MsgSet Offset
130+
struct.pack(">i", 18), # Msg Size
131+
struct.pack(">i", 1474775406), # CRC
132+
struct.pack(">bb", 0, 0), # Magic, flags
133+
struct.pack(">i", 2), # Length of key
134+
b"k1", # Key
135+
struct.pack(">i", 2), # Length of value
136+
b"v1", # Value
137+
138+
struct.pack(">q", 0), # MsgSet Offset
139+
struct.pack(">i", 18), # Msg Size
140+
struct.pack(">i", -16383415), # CRC
141+
struct.pack(">bb", 0, 0), # Magic, flags
142+
struct.pack(">i", 2), # Length of key
143+
b"k2", # Key
144+
struct.pack(">i", 2), # Length of value
145+
b"v2", # Value
146+
])
147+
148+
self.assertEqual(decoded, expect)
149+
90150
def test_encode_message_header(self):
91151
expect = b"".join([
92152
struct.pack(">h", 10), # API Key
@@ -701,7 +761,7 @@ def mock_create_message_fns(self):
701761
yield
702762

703763
def test_create_message_set(self):
704-
messages = [1, 2, 3]
764+
messages = [(1, "k1"), (2, "k2"), (3, "k3")]
705765

706766
# Default codec is CODEC_NONE. Expect list of regular messages.
707767
expect = [sentinel.message] * len(messages)

0 commit comments

Comments
 (0)