Skip to content

Commit 8214728

Browse files
committed
bugfix: race among _connecting and cluster metadata
A call to `maybe_connect` can be performed while the cluster metadata is being updated. If that happens, the assumption that every entry in `_connecting` has metadata won't hold. The existing assert will then raise on every subsequent call to `poll` driving the client instance unusable. This fixes the issue by ignoring connetion request to nodes that do not have the metadata available anymore.
1 parent 9feeb79 commit 8214728

File tree

1 file changed

+19
-12
lines changed

1 file changed

+19
-12
lines changed

kafka/client_async.py

Lines changed: 19 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -368,18 +368,25 @@ def _maybe_connect(self, node_id):
368368
conn = self._conns.get(node_id)
369369

370370
if conn is None:
371-
broker = self.cluster.broker_metadata(node_id)
372-
assert broker, 'Broker id %s not in current metadata' % (node_id,)
373-
374-
log.debug("Initiating connection to node %s at %s:%s",
375-
node_id, broker.host, broker.port)
376-
host, port, afi = get_ip_port_afi(broker.host)
377-
cb = WeakMethod(self._conn_state_change)
378-
conn = BrokerConnection(host, broker.port, afi,
379-
state_change_callback=cb,
380-
node_id=node_id,
381-
**self.config)
382-
self._conns[node_id] = conn
371+
broker_metadata = self.cluster.broker_metadata(node_id)
372+
373+
# The broker may have been removed from the cluster after the
374+
# call to `maybe_connect`. At this point there is no way to
375+
# recover, so just ignore the connection
376+
if broker_metadata is None:
377+
log.debug("Node %s is not available anymore, discarding connection", node_id)
378+
self._connecting.remove(node_id)
379+
return False
380+
else:
381+
log.debug("Initiating connection to node %s at %s:%s",
382+
node_id, broker_metadata.host, broker_metadata.port)
383+
host, port, afi = get_ip_port_afi(broker_metadata.host)
384+
cb = WeakMethod(self._conn_state_change)
385+
conn = BrokerConnection(host, broker_metadata.port, afi,
386+
state_change_callback=cb,
387+
node_id=node_id,
388+
**self.config)
389+
self._conns[node_id] = conn
383390

384391
# Check if existing connection should be recreated because host/port changed
385392
elif self._should_recycle_connection(conn):

0 commit comments

Comments
 (0)