diff --git a/kafka/conn.py b/kafka/conn.py index b276d3d62..c94154885 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -531,6 +531,9 @@ def _try_api_versions_check(self): if self._api_versions_future is None: if self.config['api_version'] is not None: self._api_version = self.config['api_version'] + # api_version will be normalized by KafkaClient, so this should not happen + if self._api_version not in BROKER_API_VERSIONS: + raise Errors.UnrecognizedBrokerVersion('api_version %s not found in kafka.protocol.broker_api_versions' % (self._api_version,)) self._api_versions = BROKER_API_VERSIONS[self._api_version] log.debug('%s: Using pre-configured api_version %s for ApiVersions', self, self._api_version) return True @@ -553,7 +556,8 @@ def _try_api_versions_check(self): self.state = ConnectionStates.API_VERSIONS_RECV self.config['state_change_callback'](self.node_id, self._sock, self) else: - raise 'Unable to determine broker version.' + self.close(Errors.KafkaConnectionError('Unable to determine broker version.')) + return False for r, f in self.recv(): f.success(r) diff --git a/test/test_conn.py b/test/test_conn.py index ea88fd04c..6af01498f 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -69,6 +69,36 @@ def test_connect(_socket, conn, states): assert conn.state is state +def test_api_versions_check(_socket): + conn = BrokerConnection('localhost', 9092, socket.AF_INET) + assert conn._api_versions_future is None + conn.connect() + assert conn._api_versions_future is not None + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + + assert conn._try_api_versions_check() is False + assert conn.connecting() is True + assert conn.state is ConnectionStates.API_VERSIONS_RECV + + conn._api_versions_future = None + conn._check_version_idx = 0 + assert conn._try_api_versions_check() is False + assert conn.connecting() is True + + conn._check_version_idx = len(conn.VERSION_CHECKS) + conn._api_versions_future = None + assert conn._try_api_versions_check() is False + assert conn.connecting() is False + assert conn.disconnected() is True + + +def test_api_versions_check_unrecognized(_socket): + conn = BrokerConnection('localhost', 9092, socket.AF_INET, api_version=(0, 0)) + with pytest.raises(Errors.UnrecognizedBrokerVersion): + conn.connect() + + def test_connect_timeout(_socket, conn): assert conn.state is ConnectionStates.DISCONNECTED