@@ -195,7 +195,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
195
195
196
196
for topic , partitions in response .topics :
197
197
for partition_info in partitions :
198
- error_message = None
198
+ global_error = None
199
199
if response .API_VERSION < 2 :
200
200
partition , error_code , offset = partition_info
201
201
ts = None
@@ -204,11 +204,11 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
204
204
elif 5 <= response .API_VERSION <= 7 :
205
205
partition , error_code , offset , ts , log_start_offset = partition_info
206
206
else :
207
- partition , error_code , offset , ts , log_start_offset , _ , error_message = partition_info
207
+ partition , error_code , offset , ts , log_start_offset , _ , global_error = partition_info
208
208
tp = TopicPartition (topic , partition )
209
- error = error_message or Errors .for_code (error_code )
209
+ error = Errors .for_code (error_code )
210
210
batch = batches_by_partition [tp ]
211
- self ._complete_batch (batch , error , offset , ts )
211
+ self ._complete_batch (batch , error , offset , ts , global_error )
212
212
213
213
if response .API_VERSION > 0 :
214
214
self ._sensors .record_throttle_time (response .throttle_time_ms , node = node_id )
@@ -218,7 +218,7 @@ def _handle_produce_response(self, node_id, send_time, batches, response):
218
218
for batch in batches :
219
219
self ._complete_batch (batch , None , - 1 , None )
220
220
221
- def _complete_batch (self , batch , error , base_offset , timestamp_ms = None , log_start_offset = None ):
221
+ def _complete_batch (self , batch , error , base_offset , timestamp_ms = None , log_start_offset = None , global_error = None ):
222
222
"""Complete or retry the given batch of records.
223
223
224
224
Arguments:
@@ -238,15 +238,15 @@ def _complete_batch(self, batch, error, base_offset, timestamp_ms=None, log_star
238
238
" retrying (%d attempts left). Error: %s" ,
239
239
batch .topic_partition ,
240
240
self .config ['retries' ] - batch .attempts - 1 ,
241
- error )
241
+ global_error or error )
242
242
self ._accumulator .reenqueue (batch )
243
243
self ._sensors .record_retries (batch .topic_partition .topic , batch .record_count )
244
244
else :
245
245
if error is Errors .TopicAuthorizationFailedError :
246
246
error = error (batch .topic_partition .topic )
247
247
248
248
# tell the user the result of their request
249
- batch .done (base_offset , timestamp_ms , error , log_start_offset )
249
+ batch .done (base_offset , timestamp_ms , error , log_start_offset , global_error )
250
250
self ._accumulator .deallocate (batch )
251
251
if error is not None :
252
252
self ._sensors .record_errors (batch .topic_partition .topic , batch .record_count )
0 commit comments