diff --git a/kafka/client_async.py b/kafka/client_async.py index 8df4566e6..8c071104e 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -978,8 +978,10 @@ def _maybe_refresh_metadata(self, wakeup=False): topics = list(self.config['bootstrap_topics_filter']) api_version = self.api_version(MetadataRequest, max_version=7) - if self.cluster.need_all_topic_metadata or not topics: + if self.cluster.need_all_topic_metadata: topics = MetadataRequest[api_version].ALL_TOPICS + elif not topics: + topics = MetadataRequest[api_version].NO_TOPICS if api_version >= 4: request = MetadataRequest[api_version](topics, self.config['allow_auto_create_topics']) else: diff --git a/kafka/protocol/metadata.py b/kafka/protocol/metadata.py index 3291be82d..bb22ba997 100644 --- a/kafka/protocol/metadata.py +++ b/kafka/protocol/metadata.py @@ -172,6 +172,7 @@ class MetadataRequest_v0(Request): ('topics', Array(String('utf-8'))) ) ALL_TOPICS = [] # Empty Array (len 0) for topics returns all topics + NO_TOPICS = [] # v0 does not support a 'no topics' request, so we'll just ask for ALL class MetadataRequest_v1(Request): diff --git a/test/test_client_async.py b/test/test_client_async.py index 8582d8fb7..276926116 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -32,7 +32,7 @@ def cli(mocker, conn): def test_bootstrap(mocker, conn): conn.state = ConnectionStates.CONNECTED - cli = KafkaClient(api_version=(0, 9)) + cli = KafkaClient(api_version=(2, 1)) mocker.patch.object(cli, '_selector') future = cli.cluster.request_update() cli.poll(future=future) @@ -43,7 +43,7 @@ def test_bootstrap(mocker, conn): kwargs.pop('state_change_callback') kwargs.pop('node_id') assert kwargs == cli.config - conn.send.assert_called_once_with(MetadataRequest[0]([]), blocking=False, request_timeout_ms=None) + conn.send.assert_called_once_with(MetadataRequest[7]([], True), blocking=False, request_timeout_ms=None) assert cli._bootstrap_fails == 0 assert cli.cluster.brokers() == set([BrokerMetadata(0, 'foo', 12, None), BrokerMetadata(1, 'bar', 34, None)]) @@ -330,6 +330,7 @@ def test_maybe_refresh_metadata_update(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') mocker.patch.object(client, '_can_send_request', return_value=True) send = mocker.patch.object(client, 'send') + client.cluster.need_all_topic_metadata = True client.poll(timeout_ms=12345678) client._poll.assert_called_with(9999.999) # request_timeout_ms