34
34
ASYNC_QUEUE_MAXSIZE = 0
35
35
ASYNC_QUEUE_PUT_TIMEOUT = 0
36
36
# no retries by default
37
- ASYNC_RETRY_OPTIONS = RetryOptions (
38
- limit = 0 , backoff_ms = 0 , retry_on_timeouts = False )
37
+ ASYNC_RETRY_LIMIT = 0
38
+ ASYNC_RETRY_BACKOFF_MS = 0
39
+ ASYNC_RETRY_ON_TIMEOUTS = False
40
+
39
41
STOP_ASYNC_PRODUCER = - 1
40
42
41
43
@@ -46,7 +48,7 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
46
48
a specified timeout and send them upstream to the brokers in one
47
49
request
48
50
"""
49
- reqs = []
51
+ reqs = {}
50
52
client .reinit ()
51
53
52
54
while not stop_event .is_set ():
@@ -81,36 +83,38 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
81
83
messages = create_message_set (msg , codec , key )
82
84
req = ProduceRequest (topic_partition .topic ,
83
85
topic_partition .partition ,
84
- messages )
85
- reqs . append ( req )
86
+ tuple ( messages ) )
87
+ reqs [ req ] = 0
86
88
87
89
if not reqs :
88
90
continue
89
91
90
92
reqs_to_retry , error_type = [], None
91
- try :
92
- client .send_produce_request (reqs ,
93
- acks = req_acks ,
94
- timeout = ack_timeout )
95
93
96
- except FailedPayloadsError as ex :
97
- error_type = FailedPayloadsError
98
- reqs_to_retry = ex .failed_payloads
94
+ try :
95
+ reply = client .send_produce_request (reqs .keys (),
96
+ acks = req_acks ,
97
+ timeout = ack_timeout ,
98
+ fail_on_error = False )
99
+ reqs_to_retry = [req for broker_responses in reply
100
+ for response in broker_responses
101
+ for req in response .failed_payloads
102
+ if isinstance (response , FailedPayloadsError )]
103
+ if reqs_to_retry :
104
+ error_type = FailedPayloadsError
99
105
100
106
except RequestTimedOutError :
101
107
error_type = RequestTimedOutError
102
108
if retry_options .retry_on_timeouts :
103
- reqs_to_retry = reqs
109
+ reqs_to_retry = reqs . keys ()
104
110
105
111
except Exception as ex :
106
112
error_type = type (ex )
107
113
if type (ex ) in RETRY_ERROR_TYPES :
108
- reqs_to_retry = reqs
109
-
110
- finally :
111
- reqs = []
114
+ reqs_to_retry = reqs .keys ()
112
115
113
- if not reqs_to_retry or retry_options .limit == 0 :
116
+ if not reqs_to_retry :
117
+ reqs = {}
114
118
continue
115
119
116
120
# doing backoff before next retry
@@ -122,10 +126,8 @@ def _send_upstream(queue, client, codec, batch_time, batch_size,
122
126
if error_type in RETRY_REFRESH_ERROR_TYPES :
123
127
client .load_metadata_for_topics ()
124
128
125
- reqs = [req ._replace (retries = req .retries + 1 )
126
- for req in reqs_to_retry
127
- if not retry_options .limit or
128
- (retry_options .limit and req .retries < retry_options .limit )]
129
+ reqs = {key : count + 1 for key , count in reqs .items ()
130
+ if key in reqs_to_retry and count < retry_options .limit }
129
131
130
132
131
133
class Producer (object ):
@@ -161,7 +163,9 @@ def __init__(self, client, async=False,
161
163
batch_send = False ,
162
164
batch_send_every_n = BATCH_SEND_MSG_COUNT ,
163
165
batch_send_every_t = BATCH_SEND_DEFAULT_INTERVAL ,
164
- async_retry_options = ASYNC_RETRY_OPTIONS ,
166
+ async_retry_limit = ASYNC_RETRY_LIMIT ,
167
+ async_retry_backoff_ms = ASYNC_RETRY_BACKOFF_MS ,
168
+ async_retry_on_timeouts = ASYNC_RETRY_ON_TIMEOUTS ,
165
169
async_queue_maxsize = ASYNC_QUEUE_MAXSIZE ,
166
170
async_queue_put_timeout = ASYNC_QUEUE_PUT_TIMEOUT ):
167
171
@@ -191,6 +195,10 @@ def __init__(self, client, async=False,
191
195
# Messages are sent through this queue
192
196
self .queue = Queue (async_queue_maxsize )
193
197
self .async_queue_put_timeout = async_queue_put_timeout
198
+ async_retry_options = RetryOptions (
199
+ limit = async_retry_limit ,
200
+ backoff_ms = async_retry_backoff_ms ,
201
+ retry_on_timeouts = async_retry_on_timeouts )
194
202
self .thread_stop_event = Event ()
195
203
self .thread = Thread (target = _send_upstream ,
196
204
args = (self .queue ,
@@ -252,7 +260,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
252
260
raise TypeError ("the key must be type bytes" )
253
261
254
262
if self .async :
255
- for m in msg :
263
+ for idx , m in enumerate ( msg ) :
256
264
try :
257
265
item = (TopicAndPartition (topic , partition ), m , key )
258
266
if self .async_queue_put_timeout == 0 :
@@ -261,6 +269,7 @@ def _send_messages(self, topic, partition, *msg, **kwargs):
261
269
self .queue .put (item , True , self .async_queue_put_timeout )
262
270
except Full :
263
271
raise AsyncProducerQueueFull (
272
+ msg [idx :],
264
273
'Producer async queue overfilled. '
265
274
'Current queue size %d.' % self .queue .qsize ())
266
275
resp = []
0 commit comments