diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 82aaa68e9..8490fdb46 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -17,6 +17,7 @@ from kafka.errors import ( IncompatibleBrokerVersion, KafkaConfigurationError, UnknownTopicOrPartitionError, UnrecognizedBrokerVersion, IllegalArgumentError) +from kafka.future import Future from kafka.metrics import MetricConfig, Metrics from kafka.protocol.admin import ( CreateTopicsRequest, DeleteTopicsRequest, DescribeConfigsRequest, AlterConfigsRequest, CreatePartitionsRequest, @@ -358,14 +359,11 @@ def _send_request_to_node(self, node_id, request, wakeup=True): Returns: A future object that may be polled for status and results. - - Raises: - The exception if the message could not be sent. """ - while not self._client.ready(node_id): - # poll until the connection to broker is ready, otherwise send() - # will fail with NodeNotReadyError - self._client.poll(timeout_ms=200) + try: + self._client.await_ready(node_id) + except Errors.KafkaConnectionError as e: + return Future().failure(e) return self._client.send(node_id, request, wakeup) def _send_request_to_controller(self, request):