Skip to content

Commit 351a3ad

Browse files
committed
Fixed compatible issues with tests
1 parent fd8c3e1 commit 351a3ad

File tree

2 files changed

+16
-15
lines changed

2 files changed

+16
-15
lines changed

kafka/producer/base.py

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -87,13 +87,15 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
8787
acks=req_acks,
8888
timeout=ack_timeout)
8989
except FailedPayloadsError as ex:
90-
log.exception("Failed payloads count %s" % len(ex.message))
90+
failed_reqs = ex.args[0]
91+
log.exception("Failed payloads count %s" % len(failed_reqs))
92+
9193
if retries_limit is None:
9294
# retry all failed messages until success
93-
reqs_to_retry = ex.message
95+
reqs_to_retry = failed_reqs
9496
elif not retries_limit < 0:
9597
#
96-
for req in ex.message:
98+
for req in failed_reqs:
9799
if retries_limit and req.retries < retries_limit:
98100
updated_req = req._replace(retries=req.retries+1)
99101
reqs_to_retry.append(updated_req)

test/test_producer.py

Lines changed: 11 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -106,11 +106,11 @@ def test_first_send_failed(self):
106106
for i in range(10):
107107
self.queue.put((TopicAndPartition("test", i), "msg %i", "key %i"))
108108

109-
flag = mp.Value('c', 'f')
109+
is_first_time = mp.Value('b', True)
110110
def send_side_effect(reqs, *args, **kwargs):
111111
self.send_calls_count.value += 1
112-
if flag.value == 'f':
113-
flag.value = 't'
112+
if is_first_time.value:
113+
is_first_time.value = False
114114
raise FailedPayloadsError(reqs)
115115

116116
self.client.send_produce_request.side_effect = send_side_effect
@@ -166,14 +166,13 @@ def send_side_effect(reqs, *args, **kwargs):
166166
# the queue should have 7 elements
167167
# 3 batches of 1 msg each were retried all this time
168168
self.assertEqual(self.queue.empty(), False)
169-
left = 0
170-
for i in range(10):
171-
try:
169+
try:
170+
for i in range(7):
172171
self.queue.get(timeout=0.01)
173-
left += 1
174-
except Empty:
175-
break
176-
self.assertEqual(left, 7)
172+
except Empty:
173+
self.fail("Should be 7 elems in the queue")
174+
self.assertEqual(self.queue.empty(), True)
177175

178-
# 1s / 50ms of backoff = 20 times
179-
self.assertEqual(self.send_calls_count.value, 20)
176+
# 1s / 50ms of backoff = 20 times max
177+
self.assertTrue(self.send_calls_count.value > 10)
178+
self.assertTrue(self.send_calls_count.value <= 20)

0 commit comments

Comments
 (0)