Skip to content

Commit 4609d1d

Browse files
committed
bugfix: infinite loop when send msgs to controller (dpkp#2194)
If the value `_controller_id` is out-of-date and the node was removed from the cluster, `_send_request_to_node` would enter an infinite loop.
1 parent 7260664 commit 4609d1d

File tree

1 file changed

+14
-1
lines changed

1 file changed

+14
-1
lines changed

kafka/admin/client.py

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -409,10 +409,23 @@ def _send_request_to_controller(self, request):
409409
tries = 2 # in case our cached self._controller_id is outdated
410410
while tries:
411411
tries -= 1
412-
future = self._send_request_to_node(self._controller_id, request)
412+
future = self._client.send(self._controller_id, request)
413413

414414
self._wait_for_futures([future])
415415

416+
if future.exception is not None:
417+
log.error(
418+
"Sending request to controller_id %s failed with %s",
419+
self._controller_id,
420+
future.exception,
421+
)
422+
is_outdated_controler = (
423+
self._client.cluster.broker_metadata(self._controller_id) is None
424+
)
425+
if is_outdated_controler:
426+
self._refresh_controller_id()
427+
continue
428+
416429
response = future.value
417430
# In Java, the error field name is inconsistent:
418431
# - CreateTopicsResponse / CreatePartitionsResponse uses topic_errors

0 commit comments

Comments
 (0)