Skip to content

Commit 57f1782

Browse files
authored
Fix lint issues via ruff check (#2522)
1 parent 02dd98f commit 57f1782

21 files changed

+44
-47
lines changed

kafka/client_async.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from kafka.vendor import six
2020

2121
from kafka.cluster import ClusterMetadata
22-
from kafka.conn import BrokerConnection, ConnectionStates, collect_hosts, get_ip_port_afi
22+
from kafka.conn import BrokerConnection, ConnectionStates, get_ip_port_afi
2323
from kafka import errors as Errors
2424
from kafka.future import Future
2525
from kafka.metrics import AnonMeasurable

kafka/conn.py

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@
1414
from kafka.vendor import selectors34 as selectors
1515

1616
import socket
17-
import struct
1817
import threading
1918
import time
2019

@@ -23,7 +22,6 @@
2322
import kafka.errors as Errors
2423
from kafka.future import Future
2524
from kafka.metrics.stats import Avg, Count, Max, Rate
26-
from kafka.oauth.abstract import AbstractTokenProvider
2725
from kafka.protocol.admin import DescribeAclsRequest, DescribeClientQuotasRequest, ListGroupsRequest
2826
from kafka.protocol.api_versions import ApiVersionsRequest
2927
from kafka.protocol.broker_api_versions import BROKER_API_VERSIONS
@@ -36,7 +34,7 @@
3634
from kafka.protocol.produce import ProduceRequest
3735
from kafka.protocol.sasl_authenticate import SaslAuthenticateRequest
3836
from kafka.protocol.sasl_handshake import SaslHandshakeRequest
39-
from kafka.protocol.types import Int32, Int8
37+
from kafka.protocol.types import Int32
4038
from kafka.sasl import get_sasl_mechanism
4139
from kafka.version import __version__
4240

@@ -1151,19 +1149,20 @@ def timed_out_ifrs(self):
11511149
def next_ifr_request_timeout_ms(self):
11521150
with self._lock:
11531151
if self.in_flight_requests:
1154-
get_timeout = lambda v: v[2]
1152+
def get_timeout(v):
1153+
return v[2]
11551154
next_timeout = min(map(get_timeout,
11561155
self.in_flight_requests.values()))
11571156
return max(0, (next_timeout - time.time()) * 1000)
11581157
else:
11591158
return float('inf')
11601159

11611160
def get_api_versions(self):
1162-
if self._api_versions is not None:
1163-
return self._api_versions
1164-
1165-
version = self.check_version()
1166-
# _api_versions is set as a side effect of check_versions()
1161+
# _api_versions is set as a side effect of first connection
1162+
# which should typically be bootstrap, but call check_version
1163+
# if that hasn't happened yet
1164+
if self._api_versions is None:
1165+
self.check_version()
11671166
return self._api_versions
11681167

11691168
def _infer_broker_version_from_api_versions(self, api_versions):
@@ -1201,11 +1200,11 @@ def _infer_broker_version_from_api_versions(self, api_versions):
12011200
]
12021201

12031202
# Get the best match of test cases
1204-
for broker_version, struct in sorted(test_cases, reverse=True):
1205-
if struct.API_KEY not in api_versions:
1203+
for broker_version, proto_struct in sorted(test_cases, reverse=True):
1204+
if proto_struct.API_KEY not in api_versions:
12061205
continue
1207-
min_version, max_version = api_versions[struct.API_KEY]
1208-
if min_version <= struct.API_VERSION <= max_version:
1206+
min_version, max_version = api_versions[proto_struct.API_KEY]
1207+
if min_version <= proto_struct.API_VERSION <= max_version:
12091208
return broker_version
12101209

12111210
# We know that ApiVersionsResponse is only supported in 0.10+

kafka/consumer/fetcher.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -501,7 +501,7 @@ def _unpack_records(self, tp, records):
501501
# If unpacking raises StopIteration, it is erroneously
502502
# caught by the generator. We want all exceptions to be raised
503503
# back to the user. See Issue 545
504-
except StopIteration as e:
504+
except StopIteration:
505505
log.exception('StopIteration raised unpacking messageset')
506506
raise RuntimeError('StopIteration raised unpacking messageset')
507507

@@ -1001,7 +1001,7 @@ def build_next(self, next_partitions):
10011001
log.debug("Built full fetch %s for node %s with %s partition(s).",
10021002
self.next_metadata, self.node_id, len(next_partitions))
10031003
self.session_partitions = next_partitions
1004-
return FetchRequestData(next_partitions, None, self.next_metadata);
1004+
return FetchRequestData(next_partitions, None, self.next_metadata)
10051005

10061006
prev_tps = set(self.session_partitions.keys())
10071007
next_tps = set(next_partitions.keys())

kafka/coordinator/assignors/sticky/sticky_assignor.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
from collections import defaultdict, namedtuple
33
from copy import deepcopy
44

5-
from kafka.cluster import ClusterMetadata
65
from kafka.coordinator.assignors.abstract import AbstractPartitionAssignor
76
from kafka.coordinator.assignors.sticky.partition_movements import PartitionMovements
87
from kafka.coordinator.assignors.sticky.sorted_set import SortedSet

kafka/coordinator/consumer.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -796,7 +796,7 @@ def _handle_offset_fetch_response(self, future, response):
796796
elif error_type is Errors.GroupAuthorizationFailedError:
797797
future.failure(error)
798798
else:
799-
log.error("Unknown error fetching offsets for %s: %s", tp, error)
799+
log.error("Unknown error fetching offsets: %s", error)
800800
future.failure(error)
801801
return
802802

kafka/metrics/metric_name.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ def __eq__(self, other):
9393
return True
9494
if other is None:
9595
return False
96-
return (type(self) == type(other) and
96+
return (isinstance(self, type(other)) and
9797
self.group == other.group and
9898
self.name == other.name and
9999
self.tags == other.tags)

kafka/metrics/quota.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ def __hash__(self):
3434
def __eq__(self, other):
3535
if self is other:
3636
return True
37-
return (type(self) == type(other) and
37+
return (isinstance(self, type(other)) and
3838
self.bound == other.bound and
3939
self.is_upper_bound() == other.is_upper_bound())
4040

kafka/producer/kafka.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -612,8 +612,8 @@ def send(self, topic, value=None, key=None, headers=None, partition=None, timest
612612

613613
if headers is None:
614614
headers = []
615-
assert type(headers) == list
616-
assert all(type(item) == tuple and len(item) == 2 and type(item[0]) == str and type(item[1]) == bytes for item in headers)
615+
assert isinstance(headers, list)
616+
assert all(isinstance(item, tuple) and len(item) == 2 and isinstance(item[0], str) and isinstance(item[1], bytes) for item in headers)
617617

618618
message_size = self._estimate_size_in_bytes(key_bytes, value_bytes, headers)
619619
self._ensure_valid_record_size(message_size)

kafka/protocol/api_versions.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
from __future__ import absolute_import
22

3+
from io import BytesIO
4+
35
from kafka.protocol.api import Request, Response
46
from kafka.protocol.types import Array, Int16, Int32, Schema
57

kafka/protocol/commit.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
from __future__ import absolute_import
22

33
from kafka.protocol.api import Request, Response
4-
from kafka.protocol.types import Array, Int8, Int16, Int32, Int64, Schema, String
4+
from kafka.protocol.types import Array, Int16, Int32, Int64, Schema, String
55

66

77
class OffsetCommitResponse_v0(Response):

0 commit comments

Comments
 (0)