diff --git a/kafka/client_async.py b/kafka/client_async.py index bd34c3b2d..b72c05dac 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -19,7 +19,7 @@ from kafka.vendor import six from kafka.cluster import ClusterMetadata -from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi +from kafka.conn import BrokerConnection, ConnectionStates, get_ip_port_afi from kafka import errors as Errors from kafka.future import Future from kafka.metrics import AnonMeasurable diff --git a/kafka/conn.py b/kafka/conn.py index 857b13a57..7287a4840 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -14,7 +14,6 @@ from kafka.vendor import selectors34 as selectors import socket -import struct import threading import time @@ -23,7 +22,6 @@ import kafka.errors as Errors from kafka.future import Future from kafka.metrics.stats import Avg, Count, Max, Rate -from kafka.oauth.abstract import AbstractTokenProvider from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest from kafka.protocol.api_versions import ApiVersionsRequest from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS @@ -36,7 +34,7 @@ from kafka.protocol.produce import ProduceRequest from kafka.protocol.sasl_authenticate import SaslAuthenticateRequest from kafka.protocol.sasl_handshake import SaslHandshakeRequest -from kafka.protocol.types import Int32, Int8 +from kafka.protocol.types import Int32 from kafka.sasl import get_sasl_mechanism from kafka.version import __version__ @@ -1151,7 +1149,8 @@ def timed_out_ifrs(self): def next_ifr_request_timeout_ms(self): with self._lock: if self.in_flight_requests: - get_timeout = lambda v: v[2] + def get_timeout(v): + return v[2] next_timeout = min(map(get_timeout, self.in_flight_requests.values())) return max(0, (next_timeout - time.time()) * 1000) @@ -1159,11 +1158,11 @@ def next_ifr_request_timeout_ms(self): return float('inf') def get_api_versions(self): - if self._api_versions is not None: - return self._api_versions - - version = self.check_version() - # _api_versions is set as a side effect of check_versions() + # _api_versions is set as a side effect of first connection + # which should typically be bootstrap, but call check_version + # if that hasn't happened yet + if self._api_versions is None: + self.check_version() return self._api_versions def _infer_broker_version_from_api_versions(self, api_versions): @@ -1201,11 +1200,11 @@ def _infer_broker_version_from_api_versions(self, api_versions): ] # Get the best match of test cases - for broker_version, struct in sorted(test_cases, reverse=True): - if struct.API_KEY not in api_versions: + for broker_version, proto_struct in sorted(test_cases, reverse=True): + if proto_struct.API_KEY not in api_versions: continue - min_version, max_version = api_versions[struct.API_KEY] - if min_version <= struct.API_VERSION <= max_version: + min_version, max_version = api_versions[proto_struct.API_KEY] + if min_version <= proto_struct.API_VERSION <= max_version: return broker_version # We know that ApiVersionsResponse is only supported in 0.10+ diff --git a/kafka/consumer/fetcher.py b/kafka/consumer/fetcher.py index eefac5ba7..a833a5b79 100644 --- a/kafka/consumer/fetcher.py +++ b/kafka/consumer/fetcher.py @@ -501,7 +501,7 @@ def _unpack_records(self, tp, records): # If unpacking raises StopIteration, it is erroneously # caught by the generator. We want all exceptions to be raised # back to the user. See Issue 545 - except StopIteration as e: + except StopIteration: log.exception('StopIteration raised unpacking messageset') raise RuntimeError('StopIteration raised unpacking messageset') @@ -1001,7 +1001,7 @@ def build_next(self, next_partitions): log.debug("Built full fetch %s for node %s with %s partition(s).", self.next_metadata, self.node_id, len(next_partitions)) self.session_partitions = next_partitions - return FetchRequestData(next_partitions, None, self.next_metadata); + return FetchRequestData(next_partitions, None, self.next_metadata) prev_tps = set(self.session_partitions.keys()) next_tps = set(next_partitions.keys()) diff --git a/kafka/coordinator/assignors/sticky/sticky_assignor.py b/kafka/coordinator/assignors/sticky/sticky_assignor.py index dce714f1a..6e79c597e 100644 --- a/kafka/coordinator/assignors/sticky/sticky_assignor.py +++ b/kafka/coordinator/assignors/sticky/sticky_assignor.py @@ -2,7 +2,6 @@ from collections import defaultdict, namedtuple from copy import deepcopy -from kafka.cluster import ClusterMetadata from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements from kafka.coordinator.assignors.sticky.sorted_set import SortedSet diff --git a/kafka/coordinator/consumer.py b/kafka/coordinator/consumer.py index 36c91ee42..9c662ce7f 100644 --- a/kafka/coordinator/consumer.py +++ b/kafka/coordinator/consumer.py @@ -796,7 +796,7 @@ def _handle_offset_fetch_response(self, future, response): elif error_type is Errors.GroupAuthorizationFailedError: future.failure(error) else: - log.error("Unknown error fetching offsets for %s: %s", tp, error) + log.error("Unknown error fetching offsets: %s", error) future.failure(error) return diff --git a/kafka/metrics/metric_name.py b/kafka/metrics/metric_name.py index b5acd1662..32a7e3a4b 100644 --- a/kafka/metrics/metric_name.py +++ b/kafka/metrics/metric_name.py @@ -93,7 +93,7 @@ def __eq__(self, other): return True if other is None: return False - return (type(self) == type(other) and + return (isinstance(self, type(other)) and self.group == other.group and self.name == other.name and self.tags == other.tags) diff --git a/kafka/metrics/quota.py b/kafka/metrics/quota.py index 4d1b0d6cb..237edf841 100644 --- a/kafka/metrics/quota.py +++ b/kafka/metrics/quota.py @@ -34,7 +34,7 @@ def __hash__(self): def __eq__(self, other): if self is other: return True - return (type(self) == type(other) and + return (isinstance(self, type(other)) and self.bound == other.bound and self.is_upper_bound() == other.is_upper_bound()) diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index 668387aac..2a70700c4 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -612,8 +612,8 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest if headers is None: headers = [] - assert type(headers) == list - assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers) + assert isinstance(headers, list) + assert all(isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], str) and isinstance(item[1], bytes) for item in headers) message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers) self._ensure_valid_record_size(message_size) diff --git a/kafka/protocol/api_versions.py b/kafka/protocol/api_versions.py index dc0aa588e..7e2e61251 100644 --- a/kafka/protocol/api_versions.py +++ b/kafka/protocol/api_versions.py @@ -1,5 +1,7 @@ from __future__ import absolute_import +from io import BytesIO + from kafka.protocol.api import Request, Response from kafka.protocol.types import Array, Int16, Int32, Schema diff --git a/kafka/protocol/commit.py b/kafka/protocol/commit.py index 53c2466fe..a0439e7ef 100644 --- a/kafka/protocol/commit.py +++ b/kafka/protocol/commit.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from kafka.protocol.api import Request, Response -from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String +from kafka.protocol.types import Array, Int16, Int32, Int64, Schema, String class OffsetCommitResponse_v0(Response): diff --git a/kafka/protocol/find_coordinator.py b/kafka/protocol/find_coordinator.py index a68a23902..be5b45ded 100644 --- a/kafka/protocol/find_coordinator.py +++ b/kafka/protocol/find_coordinator.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from kafka.protocol.api import Request, Response -from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String +from kafka.protocol.types import Int8, Int16, Int32, Schema, String class FindCoordinatorResponse_v0(Response): diff --git a/kafka/protocol/offset_for_leader_epoch.py b/kafka/protocol/offset_for_leader_epoch.py index afe8284eb..8465588a3 100644 --- a/kafka/protocol/offset_for_leader_epoch.py +++ b/kafka/protocol/offset_for_leader_epoch.py @@ -4,7 +4,7 @@ from kafka.protocol.types import Array, CompactArray, CompactString, Int16, Int32, Int64, Schema, String, TaggedFields -class OffsetForLeaderEpochResponse_v0(Request): +class OffsetForLeaderEpochResponse_v0(Response): API_KEY = 23 API_VERSION = 0 SCHEMA = Schema( @@ -16,7 +16,7 @@ class OffsetForLeaderEpochResponse_v0(Request): ('end_offset', Int64)))))) -class OffsetForLeaderEpochResponse_v1(Request): +class OffsetForLeaderEpochResponse_v1(Response): API_KEY = 23 API_VERSION = 1 SCHEMA = Schema( @@ -29,7 +29,7 @@ class OffsetForLeaderEpochResponse_v1(Request): ('end_offset', Int64)))))) -class OffsetForLeaderEpochResponse_v2(Request): +class OffsetForLeaderEpochResponse_v2(Response): API_KEY = 23 API_VERSION = 2 SCHEMA = Schema( @@ -43,13 +43,13 @@ class OffsetForLeaderEpochResponse_v2(Request): ('end_offset', Int64)))))) -class OffsetForLeaderEpochResponse_v3(Request): +class OffsetForLeaderEpochResponse_v3(Response): API_KEY = 23 API_VERSION = 3 SCHEMA = OffsetForLeaderEpochResponse_v2.SCHEMA -class OffsetForLeaderEpochResponse_v4(Request): +class OffsetForLeaderEpochResponse_v4(Response): API_KEY = 23 API_VERSION = 4 SCHEMA = Schema( diff --git a/kafka/protocol/sasl_authenticate.py b/kafka/protocol/sasl_authenticate.py index 528bb3cc6..a2b9b1988 100644 --- a/kafka/protocol/sasl_authenticate.py +++ b/kafka/protocol/sasl_authenticate.py @@ -1,7 +1,7 @@ from __future__ import absolute_import from kafka.protocol.api import Request, Response -from kafka.protocol.types import Array, Bytes, Int16, Int64, Schema, String +from kafka.protocol.types import Bytes, Int16, Int64, Schema, String class SaslAuthenticateResponse_v0(Response): diff --git a/kafka/sasl/gssapi.py b/kafka/sasl/gssapi.py index 1be3de4a4..60b658c77 100644 --- a/kafka/sasl/gssapi.py +++ b/kafka/sasl/gssapi.py @@ -1,5 +1,7 @@ from __future__ import absolute_import +import struct + # needed for SASL_GSSAPI authentication: try: import gssapi @@ -66,7 +68,7 @@ def receive(self, auth_bytes): # simply set QoP to 'auth' only (first octet). We reuse the max message size proposed # by the server message_parts = [ - Int8.encode(self.SASL_QOP_AUTH & Int8.decode(io.BytesIO(msg[0:1]))), + struct.pack('>b', self.SASL_QOP_AUTH & struct.unpack('>b', msg[0:1])), msg[:1], self.auth_id.encode(), ] diff --git a/test/test_admin.py b/test/test_admin.py index 279f85abf..cdb74242e 100644 --- a/test/test_admin.py +++ b/test/test_admin.py @@ -6,7 +6,7 @@ def test_config_resource(): with pytest.raises(KeyError): - bad_resource = kafka.admin.ConfigResource('something', 'foo') + _bad_resource = kafka.admin.ConfigResource('something', 'foo') good_resource = kafka.admin.ConfigResource('broker', 'bar') assert good_resource.resource_type == kafka.admin.ConfigResourceType.BROKER assert good_resource.name == 'bar' @@ -59,11 +59,11 @@ def test_acl_resource(): def test_new_topic(): with pytest.raises(IllegalArgumentError): - bad_topic = kafka.admin.NewTopic('foo', -1, -1) + _bad_topic = kafka.admin.NewTopic('foo', -1, -1) with pytest.raises(IllegalArgumentError): - bad_topic = kafka.admin.NewTopic('foo', 1, -1) + _bad_topic = kafka.admin.NewTopic('foo', 1, -1) with pytest.raises(IllegalArgumentError): - bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1: [1, 1, 1]}) + _bad_topic = kafka.admin.NewTopic('foo', 1, 1, {1: [1, 1, 1]}) good_topic = kafka.admin.NewTopic('foo', 1, 2) assert good_topic.name == 'foo' assert good_topic.num_partitions == 1 diff --git a/test/test_admin_integration.py b/test/test_admin_integration.py index bd2fd216e..2f6b76598 100644 --- a/test/test_admin_integration.py +++ b/test/test_admin_integration.py @@ -140,7 +140,7 @@ def test_describe_configs_invalid_broker_id_raises(kafka_admin_client): broker_id = "str" with pytest.raises(ValueError): - configs = kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) + kafka_admin_client.describe_configs([ConfigResource(ConfigResourceType.BROKER, broker_id)]) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') @@ -148,7 +148,7 @@ def test_describe_consumer_group_does_not_exist(kafka_admin_client): """Tests that the describe consumer group call fails if the group coordinator is not available """ with pytest.raises(GroupCoordinatorNotAvailableError): - group_description = kafka_admin_client.describe_consumer_groups(['test']) + kafka_admin_client.describe_consumer_groups(['test']) @pytest.mark.skipif(env_kafka_version() < (0, 11), reason='Describe consumer group requires broker >=0.11') diff --git a/test/test_cluster.py b/test/test_cluster.py index b55bdc5ad..f0a2f83d6 100644 --- a/test/test_cluster.py +++ b/test/test_cluster.py @@ -1,8 +1,6 @@ # pylint: skip-file from __future__ import absolute_import -import pytest - from kafka.cluster import ClusterMetadata from kafka.protocol.metadata import MetadataResponse diff --git a/test/test_fetcher.py b/test/test_fetcher.py index 7e948e3cb..479f6e22b 100644 --- a/test/test_fetcher.py +++ b/test/test_fetcher.py @@ -10,7 +10,7 @@ from kafka.client_async import KafkaClient from kafka.consumer.fetcher import ( - CompletedFetch, ConsumerRecord, Fetcher, NoOffsetForPartitionError + CompletedFetch, ConsumerRecord, Fetcher ) from kafka.consumer.subscription_state import SubscriptionState import kafka.errors as Errors diff --git a/test/test_object_conversion.py b/test/test_object_conversion.py index 9b1ff2131..a48eb0601 100644 --- a/test/test_object_conversion.py +++ b/test/test_object_conversion.py @@ -207,7 +207,7 @@ def test_with_metadata_response(): assert len(obj['topics']) == 2 assert obj['topics'][0]['error_code'] == 0 assert obj['topics'][0]['topic'] == 'testtopic1' - assert obj['topics'][0]['is_internal'] == False + assert obj['topics'][0]['is_internal'] is False assert len(obj['topics'][0]['partitions']) == 2 assert obj['topics'][0]['partitions'][0]['error_code'] == 0 assert obj['topics'][0]['partitions'][0]['partition'] == 0 @@ -224,7 +224,7 @@ def test_with_metadata_response(): assert obj['topics'][1]['error_code'] == 0 assert obj['topics'][1]['topic'] == 'other-test-topic' - assert obj['topics'][1]['is_internal'] == True + assert obj['topics'][1]['is_internal'] is True assert len(obj['topics'][1]['partitions']) == 1 assert obj['topics'][1]['partitions'][0]['error_code'] == 0 assert obj['topics'][1]['partitions'][0]['partition'] == 0 diff --git a/test/test_protocol.py b/test/test_protocol.py index 6f94c74e1..d0cc7ed0a 100644 --- a/test/test_protocol.py +++ b/test/test_protocol.py @@ -2,8 +2,6 @@ import io import struct -import pytest - from kafka.protocol.api import RequestHeader from kafka.protocol.fetch import FetchRequest, FetchResponse from kafka.protocol.find_coordinator import FindCoordinatorRequest @@ -273,7 +271,7 @@ def test_decode_fetch_response_partial(): def test_struct_unrecognized_kwargs(): try: - mr = MetadataRequest[0](topicz='foo') + _mr = MetadataRequest[0](topicz='foo') assert False, 'Structs should not allow unrecognized kwargs' except ValueError: pass @@ -331,6 +329,6 @@ def test_compact_data_structs(): assert CompactBytes.decode(io.BytesIO(b'\x00')) is None enc = CompactBytes.encode(b'') assert enc == b'\x01' - assert CompactBytes.decode(io.BytesIO(b'\x01')) is b'' + assert CompactBytes.decode(io.BytesIO(b'\x01')) == b'' enc = CompactBytes.encode(b'foo') assert CompactBytes.decode(io.BytesIO(enc)) == b'foo' diff --git a/test/test_sender.py b/test/test_sender.py index 83a26cd39..3da1a9f42 100644 --- a/test/test_sender.py +++ b/test/test_sender.py @@ -5,7 +5,6 @@ import io from kafka.client_async import KafkaClient -from kafka.cluster import ClusterMetadata from kafka.metrics import Metrics from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS from kafka.protocol.produce import ProduceRequest