diff --git a/kafka/client_async.py b/kafka/client_async.py index 8df4566e6..210283590 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -365,7 +365,7 @@ def _conn_state_change(self, node_id, sock, conn): self._connecting.remove(node_id) try: self._selector.unregister(sock) - except KeyError: + except (KeyError, ValueError): pass if self._sensors: diff --git a/kafka/conn.py b/kafka/conn.py index 6963a8b08..ec516b0f4 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -319,8 +319,8 @@ def _init_sasl_mechanism(self): def _dns_lookup(self): self._gai = dns_lookup(self.host, self.port, self.afi) if not self._gai: - log.error('DNS lookup failed for %s:%i (%s)', - self.host, self.port, self.afi) + log.error('%s: DNS lookup failed for %s:%i (%s)', + self, self.host, self.port, self.afi) return False return True @@ -366,6 +366,7 @@ def connect_blocking(self, timeout=float('inf')): def connect(self): """Attempt to connect and return ConnectionState""" if self.state is ConnectionStates.DISCONNECTED and not self.blacked_out(): + self.state = ConnectionStates.CONNECTING self.last_attempt = time.time() next_lookup = self._next_afi_sockaddr() if not next_lookup: @@ -390,7 +391,6 @@ def connect(self): self._sock.setsockopt(*option) self._sock.setblocking(False) - self.state = ConnectionStates.CONNECTING self.config['state_change_callback'](self.node_id, self._sock, self) log.info('%s: connecting to %s:%d [%s %s]', self, self.host, self.port, self._sock_addr, AFI_NAMES[self._sock_afi]) @@ -412,20 +412,20 @@ def connect(self): log.debug('%s: established TCP connection', self) if self.config['security_protocol'] in ('SSL', 'SASL_SSL'): - log.debug('%s: initiating SSL handshake', self) self.state = ConnectionStates.HANDSHAKE + log.debug('%s: initiating SSL handshake', self) self.config['state_change_callback'](self.node_id, self._sock, self) # _wrap_ssl can alter the connection state -- disconnects on failure self._wrap_ssl() else: - log.debug('%s: checking broker Api Versions', self) self.state = ConnectionStates.API_VERSIONS_SEND + log.debug('%s: checking broker Api Versions', self) self.config['state_change_callback'](self.node_id, self._sock, self) # Connection failed # WSAEINVAL == 10022, but errno.WSAEINVAL is not available on non-win systems elif ret not in (errno.EINPROGRESS, errno.EALREADY, errno.EWOULDBLOCK, 10022): - log.error('Connect attempt to %s returned error %s.' + log.error('%s: Connect attempt returned error %s.' ' Disconnecting.', self, ret) errstr = errno.errorcode.get(ret, 'UNKNOWN') self.close(Errors.KafkaConnectionError('{} {}'.format(ret, errstr))) @@ -438,8 +438,8 @@ def connect(self): if self.state is ConnectionStates.HANDSHAKE: if self._try_handshake(): log.debug('%s: completed SSL handshake.', self) - log.debug('%s: checking broker Api Versions', self) self.state = ConnectionStates.API_VERSIONS_SEND + log.debug('%s: checking broker Api Versions', self) self.config['state_change_callback'](self.node_id, self._sock, self) if self.state in (ConnectionStates.API_VERSIONS_SEND, ConnectionStates.API_VERSIONS_RECV): @@ -447,13 +447,13 @@ def connect(self): # _try_api_versions_check has side-effects: possibly disconnected on socket errors if self.state in (ConnectionStates.API_VERSIONS_SEND, ConnectionStates.API_VERSIONS_RECV): if self.config['security_protocol'] in ('SASL_PLAINTEXT', 'SASL_SSL'): - log.debug('%s: initiating SASL authentication', self) self.state = ConnectionStates.AUTHENTICATING + log.debug('%s: initiating SASL authentication', self) self.config['state_change_callback'](self.node_id, self._sock, self) else: # security_protocol PLAINTEXT - log.info('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED + log.info('%s: Connection complete.', self) self._reset_reconnect_backoff() self.config['state_change_callback'](self.node_id, self._sock, self) @@ -462,8 +462,8 @@ def connect(self): if self._try_authenticate(): # _try_authenticate has side-effects: possibly disconnected on socket errors if self.state is ConnectionStates.AUTHENTICATING: - log.info('%s: Connection complete.', self) self.state = ConnectionStates.CONNECTED + log.info('%s: Connection complete.', self) self._reset_reconnect_backoff() self.config['state_change_callback'](self.node_id, self._sock, self) @@ -472,7 +472,7 @@ def connect(self): # Connection timed out request_timeout = self.config['request_timeout_ms'] / 1000.0 if time.time() > request_timeout + self.last_attempt: - log.error('Connection attempt to %s timed out', self) + log.error('%s: Connection attempt timed out', self) self.close(Errors.KafkaConnectionError('timeout')) return self.state @@ -531,7 +531,7 @@ def _try_handshake(self): except (SSLWantReadError, SSLWantWriteError): pass except (SSLZeroReturnError, ConnectionError, TimeoutError, SSLEOFError): - log.warning('SSL connection closed by server during handshake.') + log.warning('%s: SSL connection closed by server during handshake.', self) self.close(Errors.KafkaConnectionError('SSL connection closed by server during handshake')) # Other SSLErrors will be raised to user @@ -611,7 +611,7 @@ def _handle_api_versions_response(self, future, response): for api_key, min_version, max_version, *rest in response.api_versions ]) self._api_version = self._infer_broker_version_from_api_versions(self._api_versions) - log.info('Broker version identified as %s', '.'.join(map(str, self._api_version))) + log.info('%s: Broker version identified as %s', self, '.'.join(map(str, self._api_version))) future.success(self._api_version) self.connect() @@ -621,7 +621,7 @@ def _handle_api_versions_failure(self, future, ex): # after failure connection is closed, so state should already be DISCONNECTED def _handle_check_version_response(self, future, version, _response): - log.info('Broker version identified as %s', '.'.join(map(str, version))) + log.info('%s: Broker version identified as %s', self, '.'.join(map(str, version))) log.info('Set configuration api_version=%s to skip auto' ' check_version requests on startup', version) self._api_versions = BROKER_API_VERSIONS[version] @@ -751,7 +751,7 @@ def _send_sasl_authenticate(self, sasl_auth_bytes): request = SaslAuthenticateRequest[0](sasl_auth_bytes) self._send(request, blocking=True) else: - log.debug('Sending %d raw sasl auth bytes to server', len(sasl_auth_bytes)) + log.debug('%s: Sending %d raw sasl auth bytes to server', self, len(sasl_auth_bytes)) try: self._send_bytes_blocking(Int32.encode(len(sasl_auth_bytes)) + sasl_auth_bytes) except (ConnectionError, TimeoutError) as e: @@ -781,7 +781,7 @@ def _recv_sasl_authenticate(self): latency_ms = (time.time() - timestamp) * 1000 if self._sensors: self._sensors.request_time.record(latency_ms) - log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response) + log.debug('%s: Response %d (%s ms): %s', self, correlation_id, latency_ms, response) error_type = Errors.for_code(response.error_code) if error_type is not Errors.NoError: @@ -792,7 +792,7 @@ def _recv_sasl_authenticate(self): return response.auth_bytes else: # unframed bytes w/ SaslHandhake v0 - log.debug('Received %d raw sasl auth bytes from server', nbytes) + log.debug('%s: Received %d raw sasl auth bytes from server', self, nbytes) return data[4:] def _sasl_authenticate(self, future): @@ -956,7 +956,8 @@ def close(self, error=None): # drop lock before state change callback and processing futures self.config['state_change_callback'](self.node_id, sock, self) - sock.close() + if sock: + sock.close() for (_correlation_id, (future, _timestamp, _timeout)) in ifrs: future.failure(error) @@ -1002,7 +1003,7 @@ def _send(self, request, blocking=True, request_timeout_ms=None): correlation_id = self._protocol.send_request(request) - log.debug('%s Request %d (timeout_ms %s): %s', self, correlation_id, request_timeout_ms, request) + log.debug('%s: Request %d (timeout_ms %s): %s', self, correlation_id, request_timeout_ms, request) if request.expect_response(): assert correlation_id not in self.in_flight_requests, 'Correlation ID already in-flight!' sent_time = time.time() @@ -1036,7 +1037,7 @@ def send_pending_requests(self): return True except (ConnectionError, TimeoutError) as e: - log.exception("Error sending request data to %s", self) + log.exception("%s: Error sending request data", self) error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) return False @@ -1069,7 +1070,7 @@ def send_pending_requests_v2(self): return len(self._send_buffer) == 0 except (ConnectionError, TimeoutError, Exception) as e: - log.exception("Error sending request data to %s", self) + log.exception("%s: Error sending request data", self) error = Errors.KafkaConnectionError("%s: %s" % (self, e)) self.close(error=error) return False @@ -1106,7 +1107,7 @@ def recv(self): if not responses and self.requests_timed_out(): timed_out = self.timed_out_ifrs() timeout_ms = (timed_out[0][2] - timed_out[0][1]) * 1000 - log.warning('%s timed out after %s ms. Closing connection.', + log.warning('%s: timed out after %s ms. Closing connection.', self, timeout_ms) self.close(error=Errors.RequestTimedOutError( 'Request timed out after %s ms' % @@ -1125,7 +1126,7 @@ def recv(self): if self._sensors: self._sensors.request_time.record(latency_ms) - log.debug('%s Response %d (%s ms): %s', self, correlation_id, latency_ms, response) + log.debug('%s: Response %d (%s ms): %s', self, correlation_id, latency_ms, response) self._maybe_throttle(response) responses[i] = (response, future) @@ -1137,7 +1138,7 @@ def _recv(self): err = None with self._lock: if not self._can_send_recv(): - log.warning('%s cannot recv: socket not connected', self) + log.warning('%s: cannot recv: socket not connected', self) return () while len(recvd) < self.config['sock_chunk_buffer_count']: