From 4a1da2a057d17bbe969d72b4f30b7d00704e7877 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 16 Mar 2025 09:21:31 -0700 Subject: [PATCH 1/4] Test api_versions_check failure --- test/test_conn.py | 24 ++++++++++++++++++++++++ 1 file changed, 24 insertions(+) diff --git a/test/test_conn.py b/test/test_conn.py index ea88fd04c..9d5f5640d 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -69,6 +69,30 @@ def test_connect(_socket, conn, states): assert conn.state is state +def test_api_versions_check(_socket): + conn = BrokerConnection('localhost', 9092, socket.AF_INET) + assert conn._api_versions_future is None + conn.connect() + assert conn._api_versions_future is not None + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + + assert conn._try_api_versions_check() is False + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + + conn._api_versions_future = None + conn._check_version_idx = 0 + assert conn._try_api_versions_check() is False + assert conn.connecting() is True + + conn._check_version_idx = len(conn.VERSION_CHECKS) + conn._api_versions_future = None + assert conn._try_api_versions_check() is False + assert conn.connecting() is False + assert conn.disconnected() is True + + def test_connect_timeout(_socket, conn): assert conn.state is ConnectionStates.DISCONNECTED From 28dbc411518eae3c792acabad5ad12d37133a0c0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 16 Mar 2025 09:28:20 -0700 Subject: [PATCH 2/4] Improve connection error handling when try_api_versions_check fails all attempts --- kafka/conn.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index b276d3d62..0851cd8c6 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -531,6 +531,9 @@ def _try_api_versions_check(self): if self._api_versions_future is None: if self.config['api_version'] is not None: self._api_version = self.config['api_version'] + # api_version will be normalized by KafkaClient, so this should not happen + if self._api_version not in BROKER_API_VERSIONS: + raise Errors.UnrecognizedBrokerVersion('api_version %s not found in kafka.protocol.broker_api_versions', self._api_version) self._api_versions = BROKER_API_VERSIONS[self._api_version] log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self._api_version) return True @@ -553,7 +556,8 @@ def _try_api_versions_check(self): self.state = ConnectionStates.API_VERSIONS_RECV self.config['state_change_callback'](self.node_id, self._sock, self) else: - raise 'Unable to determine broker version.' + self.close(Errors.KafkaConnectionError('Unable to determine broker version.')) + return False for r, f in self.recv(): f.success(r) From a7d060b721ba82ffca2a71c366ab4037187fc20b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 16 Mar 2025 09:32:12 -0700 Subject: [PATCH 3/4] Test UnrecognizedBrokerVersion raise --- test/test_conn.py | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/test/test_conn.py b/test/test_conn.py index 9d5f5640d..6af01498f 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -93,6 +93,12 @@ def test_api_versions_check(_socket): assert conn.disconnected() is True +def test_api_versions_check_unrecognized(_socket): + conn = BrokerConnection('localhost', 9092, socket.AF_INET, api_version=(0, 0)) + with pytest.raises(Errors.UnrecognizedBrokerVersion): + conn.connect() + + def test_connect_timeout(_socket, conn): assert conn.state is ConnectionStates.DISCONNECTED From bf3a25bf27b7421b39b18cb603bd5186471fb6a6 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Sun, 16 Mar 2025 09:32:23 -0700 Subject: [PATCH 4/4] fixup UnrecognizedBrokerVersion message --- kafka/conn.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/conn.py b/kafka/conn.py index 0851cd8c6..c94154885 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -533,7 +533,7 @@ def _try_api_versions_check(self): self._api_version = self.config['api_version'] # api_version will be normalized by KafkaClient, so this should not happen if self._api_version not in BROKER_API_VERSIONS: - raise Errors.UnrecognizedBrokerVersion('api_version %s not found in kafka.protocol.broker_api_versions', self._api_version) + raise Errors.UnrecognizedBrokerVersion('api_version %s not found in kafka.protocol.broker_api_versions' % (self._api_version,)) self._api_versions = BROKER_API_VERSIONS[self._api_version] log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self._api_version) return True