1
- from __future__ import absolute_import , division
2
-
3
1
import collections
4
2
import copy
5
3
import logging
32
30
from kafka .vendor import socketpair
33
31
from kafka .version import __version__
34
32
35
- if six .PY2 :
36
- ConnectionError = None
37
-
38
-
39
33
log = logging .getLogger ('kafka.client' )
40
34
41
35
42
- class KafkaClient ( object ) :
36
+ class KafkaClient :
43
37
"""
44
38
A network client for asynchronous request/response network I/O.
45
39
@@ -374,7 +368,7 @@ def _maybe_connect(self, node_id):
374
368
375
369
if conn is None :
376
370
broker = self .cluster .broker_metadata (node_id )
377
- assert broker , 'Broker id %s not in current metadata' % (node_id , )
371
+ assert broker , 'Broker id {} not in current metadata' . format (node_id )
378
372
379
373
log .debug ("Initiating connection to node %s at %s:%s" ,
380
374
node_id , broker .host , broker .port )
@@ -686,7 +680,7 @@ def _poll(self, timeout):
686
680
unexpected_data = key .fileobj .recv (1 )
687
681
if unexpected_data : # anything other than a 0-byte read means protocol issues
688
682
log .warning ('Protocol out of sync on %r, closing' , conn )
689
- except socket . error :
683
+ except OSError :
690
684
pass
691
685
conn .close (Errors .KafkaConnectionError ('Socket EVENT_READ without in-flight-requests' ))
692
686
continue
@@ -701,7 +695,7 @@ def _poll(self, timeout):
701
695
if conn not in processed and conn .connected () and conn ._sock .pending ():
702
696
self ._pending_completion .extend (conn .recv ())
703
697
704
- for conn in six . itervalues ( self ._conns ):
698
+ for conn in self ._conns . values ( ):
705
699
if conn .requests_timed_out ():
706
700
log .warning ('%s timed out after %s ms. Closing connection.' ,
707
701
conn , conn .config ['request_timeout_ms' ])
@@ -941,7 +935,7 @@ def wakeup(self):
941
935
except socket .timeout :
942
936
log .warning ('Timeout to send to wakeup socket!' )
943
937
raise Errors .KafkaTimeoutError ()
944
- except socket . error as e :
938
+ except OSError as e :
945
939
log .warning ('Unable to send to wakeup socket!' )
946
940
if self ._raise_upon_socket_err_during_wakeup :
947
941
raise e
@@ -951,7 +945,7 @@ def _clear_wake_fd(self):
951
945
while True :
952
946
try :
953
947
self ._wake_r .recv (1024 )
954
- except socket . error :
948
+ except OSError :
955
949
break
956
950
957
951
def _maybe_close_oldest_connection (self ):
@@ -981,7 +975,7 @@ def bootstrap_connected(self):
981
975
OrderedDict = dict
982
976
983
977
984
- class IdleConnectionManager ( object ) :
978
+ class IdleConnectionManager :
985
979
def __init__ (self , connections_max_idle_ms ):
986
980
if connections_max_idle_ms > 0 :
987
981
self .connections_max_idle = connections_max_idle_ms / 1000
@@ -1043,7 +1037,7 @@ def poll_expired_connection(self):
1043
1037
return None
1044
1038
1045
1039
1046
- class KafkaClientMetrics ( object ) :
1040
+ class KafkaClientMetrics :
1047
1041
def __init__ (self , metrics , metric_group_prefix , conns ):
1048
1042
self .metrics = metrics
1049
1043
self .metric_group_name = metric_group_prefix + '-metrics'
0 commit comments