From ae60892d2c3a7c339114714742394202a00a6c10 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Feb 2025 09:03:31 -0800 Subject: [PATCH 1/9] conn: no connection delays between dns entries; merge blacked_out w/ connection_delay --- kafka/conn.py | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kafka/conn.py b/kafka/conn.py index 4fd8bc759..3b32d4e6d 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -848,8 +848,7 @@ def blacked_out(self): re-establish a connection yet """ if self.state is ConnectionStates.DISCONNECTED: - if time.time() < self.last_attempt + self._reconnect_backoff: - return True + return self.connection_delay() > 0 return False def connection_delay(self): @@ -859,9 +858,12 @@ def connection_delay(self): the reconnect backoff time. When connecting or connected, returns a very large number to handle slow/stalled connections. """ - time_waited = time.time() - (self.last_attempt or 0) if self.state is ConnectionStates.DISCONNECTED: - return max(self._reconnect_backoff - time_waited, 0) * 1000 + if len(self._gai) > 0: + return 0 + else: + time_waited = time.time() - self.last_attempt + return max(self._reconnect_backoff - time_waited, 0) * 1000 else: # When connecting or connected, we should be able to delay # indefinitely since other events (connection or data acked) will From b0655fcafeb7c8a3b895bf80fec0969950ed10da Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Feb 2025 09:05:19 -0800 Subject: [PATCH 2/9] conn: next_ifr_request_timeout_ms() returns delay until next req timeout --- kafka/client_async.py | 10 +++++++++- kafka/conn.py | 11 +++++++---- test/conftest.py | 1 + test/test_client_async.py | 9 ++++++--- 4 files changed, 23 insertions(+), 8 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4ec..c74fa4e6d 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -589,11 +589,13 @@ def poll(self, timeout_ms=None, future=None): timeout = 0 else: idle_connection_timeout_ms = self._idle_expiry_manager.next_check_ms() + request_timeout_ms = self._next_ifr_request_timeout_ms() + log.debug("Timeouts: user %f, metadata %f, idle connection %f, request %f", timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms) timeout = min( timeout_ms, metadata_timeout_ms, idle_connection_timeout_ms, - self.config['request_timeout_ms']) + request_timeout_ms) # if there are no requests in flight, do not block longer than the retry backoff if self.in_flight_request_count() == 0: timeout = min(timeout, self.config['retry_backoff_ms']) @@ -803,6 +805,12 @@ def add_topic(self, topic): self._topics.add(topic) return self.cluster.request_update() + def _next_ifr_request_timeout_ms(self): + if self._conns: + return min([conn.next_ifr_request_timeout_ms() for conn in six.itervalues(self._conns)]) + else: + return float('inf') + # This method should be locked when running multi-threaded def _maybe_refresh_metadata(self, wakeup=False): """Send a metadata request if needed. diff --git a/kafka/conn.py b/kafka/conn.py index 3b32d4e6d..f5c092dab 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -1138,15 +1138,18 @@ def _recv(self): return () def requests_timed_out(self): + return self.next_ifr_request_timeout_ms() == 0 + + def next_ifr_request_timeout_ms(self): with self._lock: if self.in_flight_requests: get_timestamp = lambda v: v[1] oldest_at = min(map(get_timestamp, self.in_flight_requests.values())) - timeout = self.config['request_timeout_ms'] / 1000.0 - if time.time() >= oldest_at + timeout: - return True - return False + next_timeout = oldest_at + self.config['request_timeout_ms'] / 1000.0 + return max(0, (next_timeout - time.time()) * 1000) + else: + return float('inf') def _handle_api_version_response(self, response): error_type = Errors.for_code(response.error_code) diff --git a/test/conftest.py b/test/conftest.py index 3fa0262fd..0179c5620 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -138,6 +138,7 @@ def conn(mocker): [(0, 'foo', 12), (1, 'bar', 34)], # brokers [])) # topics conn.blacked_out.return_value = False + conn.next_ifr_request_timeout_ms.return_value = float('inf') def _set_conn_state(state): conn.state = state return state diff --git a/test/test_client_async.py b/test/test_client_async.py index 66b227aa9..9ebdde2a9 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -230,24 +230,27 @@ def test_send(cli, conn): def test_poll(mocker): metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata') + ifr_request_timeout = mocker.patch.object(KafkaClient, '_next_ifr_request_timeout_ms') _poll = mocker.patch.object(KafkaClient, '_poll') ifrs = mocker.patch.object(KafkaClient, 'in_flight_request_count') ifrs.return_value = 1 cli = KafkaClient(api_version=(0, 9)) # metadata timeout wins + ifr_request_timeout.return_value = float('inf') metadata.return_value = 1000 cli.poll() _poll.assert_called_with(1.0) # user timeout wins - cli.poll(250) + cli.poll(timeout_ms=250) _poll.assert_called_with(0.25) - # default is request_timeout_ms + # ifr request timeout wins + ifr_request_timeout.return_value = 30000 metadata.return_value = 1000000 cli.poll() - _poll.assert_called_with(cli.config['request_timeout_ms'] / 1000.0) + _poll.assert_called_with(30.0) # If no in-flight-requests, drop timeout to retry_backoff_ms ifrs.return_value = 0 From ecf6fb209ce0121a1c299133b4cc9c3c90c1d396 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Feb 2025 09:12:10 -0800 Subject: [PATCH 3/9] Drop poll timeout reset when no in flight requests --- kafka/client_async.py | 3 --- test/test_client_async.py | 12 ------------ 2 files changed, 15 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index c74fa4e6d..896e18ff1 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -596,9 +596,6 @@ def poll(self, timeout_ms=None, future=None): metadata_timeout_ms, idle_connection_timeout_ms, request_timeout_ms) - # if there are no requests in flight, do not block longer than the retry backoff - if self.in_flight_request_count() == 0: - timeout = min(timeout, self.config['retry_backoff_ms']) timeout = max(0, timeout) # avoid negative timeouts self._poll(timeout / 1000) diff --git a/test/test_client_async.py b/test/test_client_async.py index 9ebdde2a9..50c9158d3 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -232,8 +232,6 @@ def test_poll(mocker): metadata = mocker.patch.object(KafkaClient, '_maybe_refresh_metadata') ifr_request_timeout = mocker.patch.object(KafkaClient, '_next_ifr_request_timeout_ms') _poll = mocker.patch.object(KafkaClient, '_poll') - ifrs = mocker.patch.object(KafkaClient, 'in_flight_request_count') - ifrs.return_value = 1 cli = KafkaClient(api_version=(0, 9)) # metadata timeout wins @@ -252,11 +250,6 @@ def test_poll(mocker): cli.poll() _poll.assert_called_with(30.0) - # If no in-flight-requests, drop timeout to retry_backoff_ms - ifrs.return_value = 0 - cli.poll() - _poll.assert_called_with(cli.config['retry_backoff_ms'] / 1000.0) - def test__poll(): pass @@ -312,14 +305,12 @@ def client(mocker): def test_maybe_refresh_metadata_ttl(mocker, client): client.cluster.ttl.return_value = 1234 - mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) client.poll(timeout_ms=12345678) client._poll.assert_called_with(1.234) def test_maybe_refresh_metadata_backoff(mocker, client): - mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) now = time.time() t = mocker.patch('time.time') t.return_value = now @@ -330,7 +321,6 @@ def test_maybe_refresh_metadata_backoff(mocker, client): def test_maybe_refresh_metadata_in_progress(mocker, client): client._metadata_refresh_in_progress = True - mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) client.poll(timeout_ms=12345678) client._poll.assert_called_with(9999.999) # request_timeout_ms @@ -339,7 +329,6 @@ def test_maybe_refresh_metadata_in_progress(mocker, client): def test_maybe_refresh_metadata_update(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') mocker.patch.object(client, '_can_send_request', return_value=True) - mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) send = mocker.patch.object(client, 'send') client.poll(timeout_ms=12345678) @@ -354,7 +343,6 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, '_can_connect', return_value=True) mocker.patch.object(client, '_maybe_connect', return_value=True) mocker.patch.object(client, 'maybe_connect', return_value=True) - mocker.patch.object(KafkaClient, 'in_flight_request_count', return_value=1) now = time.time() t = mocker.patch('time.time') From 9144c1ffd4fe08a9eeb7f6733b66f9c73fa5f542 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Feb 2025 09:14:38 -0800 Subject: [PATCH 4/9] Do not mark conn as sending if future immediately resolves (error) --- kafka/client_async.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 896e18ff1..9883f8005 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -537,7 +537,8 @@ def send(self, node_id, request, wakeup=True): # we will need to call send_pending_requests() # to trigger network I/O future = conn.send(request, blocking=False) - self._sending.add(conn) + if not future.is_done: + self._sending.add(conn) # Wakeup signal is useful in case another thread is # blocked waiting for incoming network traffic while holding @@ -614,6 +615,8 @@ def poll(self, timeout_ms=None, future=None): def _register_send_sockets(self): while self._sending: conn = self._sending.pop() + if conn._sock is None: + continue try: key = self._selector.get_key(conn._sock) events = key.events | selectors.EVENT_WRITE From f01d20346eb78ec263c66c0e720a6b78446db1a7 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Feb 2025 09:17:53 -0800 Subject: [PATCH 5/9] client poll: do not set 100ms timeout for unfinished futures --- kafka/client_async.py | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 9883f8005..545a61878 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -564,9 +564,7 @@ def poll(self, timeout_ms=None, future=None): Returns: list: responses received (can be empty) """ - if future is not None: - timeout_ms = 100 - elif timeout_ms is None: + if timeout_ms is None: timeout_ms = self.config['request_timeout_ms'] elif not isinstance(timeout_ms, (int, float)): raise TypeError('Invalid type for timeout: %s' % type(timeout_ms)) From 99658775012063d8f5ebded04cf1d2ebcbf49ab1 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Feb 2025 09:21:59 -0800 Subject: [PATCH 6/9] Improve metadata refresh backoff/retry -- respect connection delays --- kafka/client_async.py | 28 ++++++++++++++++++++-------- test/conftest.py | 1 + test/test_client_async.py | 9 ++++++--- 3 files changed, 27 insertions(+), 11 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 545a61878..caeea0159 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -576,13 +576,13 @@ def poll(self, timeout_ms=None, future=None): if self._closed: break + # Send a metadata request if needed (or initiate new connection) + metadata_timeout_ms = self._maybe_refresh_metadata() + # Attempt to complete pending connections for node_id in list(self._connecting): self._maybe_connect(node_id) - # Send a metadata request if needed - metadata_timeout_ms = self._maybe_refresh_metadata() - # If we got a future that is already done, don't block in _poll if future is not None and future.is_done: timeout = 0 @@ -772,6 +772,17 @@ def least_loaded_node(self): return found + def least_loaded_node_refresh_ms(self): + """Return connection delay in milliseconds for next available node. + + This method is used primarily for retry/backoff during metadata refresh + during / after a cluster outage, in which there are no available nodes. + + Returns: + float: delay_ms + """ + return min([self.connection_delay(broker.nodeId) for broker in self.cluster.brokers()]) + def set_topics(self, topics): """Set specific topics to track for metadata. @@ -814,7 +825,7 @@ def _maybe_refresh_metadata(self, wakeup=False): """Send a metadata request if needed. Returns: - int: milliseconds until next refresh + float: milliseconds until next refresh """ ttl = self.cluster.ttl() wait_for_in_progress_ms = self.config['request_timeout_ms'] if self._metadata_refresh_in_progress else 0 @@ -828,8 +839,9 @@ def _maybe_refresh_metadata(self, wakeup=False): # least_loaded_node() node_id = self.least_loaded_node() if node_id is None: - log.debug("Give up sending metadata request since no node is available"); - return self.config['reconnect_backoff_ms'] + next_connect_ms = self.least_loaded_node_refresh_ms() + log.debug("Give up sending metadata request since no node is available. (reconnect delay %d ms)", next_connect_ms) + return next_connect_ms if self._can_send_request(node_id): topics = list(self._topics) @@ -856,11 +868,11 @@ def refresh_done(val_or_error): # the client from unnecessarily connecting to additional nodes while a previous connection # attempt has not been completed. if self._connecting: - return self.config['reconnect_backoff_ms'] + return float('inf') if self.maybe_connect(node_id, wakeup=wakeup): log.debug("Initializing connection to node %s for metadata request", node_id) - return self.config['reconnect_backoff_ms'] + return float('inf') # connected but can't send more, OR connecting # In either case we just need to wait for a network event diff --git a/test/conftest.py b/test/conftest.py index 0179c5620..d54a91243 100644 --- a/test/conftest.py +++ b/test/conftest.py @@ -137,6 +137,7 @@ def conn(mocker): MetadataResponse[0]( [(0, 'foo', 12), (1, 'bar', 34)], # brokers [])) # topics + conn.connection_delay.return_value = 0 conn.blacked_out.return_value = False conn.next_ifr_request_timeout_ms.return_value = float('inf') def _set_conn_state(state): diff --git a/test/test_client_async.py b/test/test_client_async.py index 50c9158d3..ec5e2c0ae 100644 --- a/test/test_client_async.py +++ b/test/test_client_async.py @@ -311,12 +311,14 @@ def test_maybe_refresh_metadata_ttl(mocker, client): def test_maybe_refresh_metadata_backoff(mocker, client): + mocker.patch.object(client, 'least_loaded_node', return_value=None) + mocker.patch.object(client, 'least_loaded_node_refresh_ms', return_value=4321) now = time.time() t = mocker.patch('time.time') t.return_value = now client.poll(timeout_ms=12345678) - client._poll.assert_called_with(2.222) # reconnect backoff + client._poll.assert_called_with(4.321) def test_maybe_refresh_metadata_in_progress(mocker, client): @@ -340,6 +342,7 @@ def test_maybe_refresh_metadata_update(mocker, client): def test_maybe_refresh_metadata_cant_send(mocker, client): mocker.patch.object(client, 'least_loaded_node', return_value='foobar') + mocker.patch.object(client, '_can_send_request', return_value=False) mocker.patch.object(client, '_can_connect', return_value=True) mocker.patch.object(client, '_maybe_connect', return_value=True) mocker.patch.object(client, 'maybe_connect', return_value=True) @@ -350,14 +353,14 @@ def test_maybe_refresh_metadata_cant_send(mocker, client): # first poll attempts connection client.poll(timeout_ms=12345678) - client._poll.assert_called_with(2.222) # reconnect backoff + client._poll.assert_called_with(12345.678) client.maybe_connect.assert_called_once_with('foobar', wakeup=False) # poll while connecting should not attempt a new connection client._connecting.add('foobar') client._can_connect.reset_mock() client.poll(timeout_ms=12345678) - client._poll.assert_called_with(2.222) # connection timeout (reconnect timeout) + client._poll.assert_called_with(12345.678) assert not client._can_connect.called assert not client._metadata_refresh_in_progress From 31f3cbaee89452004b96bafc17099b8f825f0d95 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Feb 2025 11:20:05 -0800 Subject: [PATCH 7/9] conn: honor reconnect backoff in connection_delay when connecting --- kafka/client_async.py | 5 ++--- kafka/conn.py | 11 +++++++---- test/test_conn.py | 24 ++++++++++++++++++++++-- 3 files changed, 31 insertions(+), 9 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index caeea0159..9d9e795e6 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -464,9 +464,8 @@ def is_disconnected(self, node_id): def connection_delay(self, node_id): """ Return the number of milliseconds to wait, based on the connection - state, before attempting to send data. When disconnected, this respects - the reconnect backoff time. When connecting, returns 0 to allow - non-blocking connect to finish. When connected, returns a very large + state, before attempting to send data. When connecting or disconnected, + this respects the reconnect backoff time. When connected, returns a very large number to handle slow/stalled connections. Arguments: diff --git a/kafka/conn.py b/kafka/conn.py index f5c092dab..d53c8892d 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -854,11 +854,11 @@ def blacked_out(self): def connection_delay(self): """ Return the number of milliseconds to wait, based on the connection - state, before attempting to send data. When disconnected, this respects - the reconnect backoff time. When connecting or connected, returns a very + state, before attempting to send data. When connecting or disconnected, + this respects the reconnect backoff time. When connected, returns a very large number to handle slow/stalled connections. """ - if self.state is ConnectionStates.DISCONNECTED: + if self.disconnected() or self.connecting(): if len(self._gai) > 0: return 0 else: @@ -889,6 +889,9 @@ def _reset_reconnect_backoff(self): self._failures = 0 self._reconnect_backoff = self.config['reconnect_backoff_ms'] / 1000.0 + def _reconnect_jitter_pct(self): + return uniform(0.8, 1.2) + def _update_reconnect_backoff(self): # Do not mark as failure if there are more dns entries available to try if len(self._gai) > 0: @@ -897,7 +900,7 @@ def _update_reconnect_backoff(self): self._failures += 1 self._reconnect_backoff = self.config['reconnect_backoff_ms'] * 2 ** (self._failures - 1) self._reconnect_backoff = min(self._reconnect_backoff, self.config['reconnect_backoff_max_ms']) - self._reconnect_backoff *= uniform(0.8, 1.2) + self._reconnect_backoff *= self._reconnect_jitter_pct() self._reconnect_backoff /= 1000.0 log.debug('%s: reconnect backoff %s after %s failures', self, self._reconnect_backoff, self._failures) diff --git a/test/test_conn.py b/test/test_conn.py index 966f7b34d..3afa9422d 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -80,15 +80,35 @@ def test_blacked_out(conn): assert conn.blacked_out() is True -def test_connection_delay(conn): +def test_connection_delay(conn, mocker): + mocker.patch.object(conn, '_reconnect_jitter_pct', return_value=1.0) with mock.patch("time.time", return_value=1000): conn.last_attempt = 1000 assert conn.connection_delay() == conn.config['reconnect_backoff_ms'] conn.state = ConnectionStates.CONNECTING - assert conn.connection_delay() == float('inf') + assert conn.connection_delay() == conn.config['reconnect_backoff_ms'] conn.state = ConnectionStates.CONNECTED assert conn.connection_delay() == float('inf') + conn._gai.clear() + conn._update_reconnect_backoff() + conn.state = ConnectionStates.DISCONNECTED + assert conn.connection_delay() == 1.0 * conn.config['reconnect_backoff_ms'] + conn.state = ConnectionStates.CONNECTING + assert conn.connection_delay() == 1.0 * conn.config['reconnect_backoff_ms'] + + conn._update_reconnect_backoff() + conn.state = ConnectionStates.DISCONNECTED + assert conn.connection_delay() == 2.0 * conn.config['reconnect_backoff_ms'] + conn.state = ConnectionStates.CONNECTING + assert conn.connection_delay() == 2.0 * conn.config['reconnect_backoff_ms'] + + conn._update_reconnect_backoff() + conn.state = ConnectionStates.DISCONNECTED + assert conn.connection_delay() == 4.0 * conn.config['reconnect_backoff_ms'] + conn.state = ConnectionStates.CONNECTING + assert conn.connection_delay() == 4.0 * conn.config['reconnect_backoff_ms'] + def test_connected(conn): assert conn.connected() is False From 4eb8e36dd8822ce27c9d25ae032987b682b8e0e0 Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Feb 2025 11:21:13 -0800 Subject: [PATCH 8/9] Log connection delay for not-ready nodes in producer sender loop --- kafka/producer/sender.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/kafka/producer/sender.py b/kafka/producer/sender.py index 35688d3f1..581064ca5 100644 --- a/kafka/producer/sender.py +++ b/kafka/producer/sender.py @@ -103,14 +103,14 @@ def run_once(self): self._metadata.request_update() # remove any nodes we aren't ready to send to - not_ready_timeout = float('inf') + not_ready_timeout_ms = float('inf') for node in list(ready_nodes): if not self._client.is_ready(node): - log.debug('Node %s not ready; delaying produce of accumulated batch', node) + node_delay_ms = self._client.connection_delay(node) + log.debug('Node %s not ready; delaying produce of accumulated batch (%f ms)', node, node_delay_ms) self._client.maybe_connect(node, wakeup=False) ready_nodes.remove(node) - not_ready_timeout = min(not_ready_timeout, - self._client.connection_delay(node)) + not_ready_timeout_ms = min(not_ready_timeout_ms, node_delay_ms) # create produce requests batches_by_node = self._accumulator.drain( @@ -136,7 +136,7 @@ def run_once(self): # off). Note that this specifically does not include nodes with # sendable data that aren't ready to send since they would cause busy # looping. - poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout) + poll_timeout_ms = min(next_ready_check_delay * 1000, not_ready_timeout_ms) if ready_nodes: log.debug("Nodes with data ready to send: %s", ready_nodes) # trace log.debug("Created %d produce requests: %s", len(requests), requests) # trace From 7235d6c119a9400194195330ed512d514a19d52b Mon Sep 17 00:00:00 2001 From: Dana Powers Date: Fri, 14 Feb 2025 11:35:53 -0800 Subject: [PATCH 9/9] Increase default reconnect_backoff_max_ms to 30000 (30 secs) --- kafka/admin/client.py | 4 ++-- kafka/client_async.py | 4 ++-- kafka/conn.py | 4 ++-- kafka/consumer/group.py | 4 ++-- kafka/producer/kafka.py | 4 ++-- 5 files changed, 10 insertions(+), 10 deletions(-) diff --git a/kafka/admin/client.py b/kafka/admin/client.py index 22c29878d..62527838f 100644 --- a/kafka/admin/client.py +++ b/kafka/admin/client.py @@ -72,7 +72,7 @@ class KafkaAdminClient(object): reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between - 20% below and 20% above the computed value. Default: 1000. + 20% below and 20% above the computed value. Default: 30000. request_timeout_ms (int): Client request timeout in milliseconds. Default: 30000. connections_max_idle_ms: Close idle connections after the number of @@ -156,7 +156,7 @@ class KafkaAdminClient(object): 'request_timeout_ms': 30000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max_ms': 1000, + 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, diff --git a/kafka/client_async.py b/kafka/client_async.py index 9d9e795e6..ea5e606cb 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -75,7 +75,7 @@ class KafkaClient(object): reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between - 20% below and 20% above the computed value. Default: 1000. + 20% below and 20% above the computed value. Default: 30000. request_timeout_ms (int): Client request timeout in milliseconds. Default: 30000. connections_max_idle_ms: Close idle connections after the number of @@ -164,7 +164,7 @@ class KafkaClient(object): 'wakeup_timeout_ms': 3000, 'connections_max_idle_ms': 9 * 60 * 1000, 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max_ms': 1000, + 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, diff --git a/kafka/conn.py b/kafka/conn.py index d53c8892d..7dab7995c 100644 --- a/kafka/conn.py +++ b/kafka/conn.py @@ -120,7 +120,7 @@ class BrokerConnection(object): reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between - 20% below and 20% above the computed value. Default: 1000. + 20% below and 20% above the computed value. Default: 30000. request_timeout_ms (int): Client request timeout in milliseconds. Default: 30000. max_in_flight_requests_per_connection (int): Requests are pipelined @@ -198,7 +198,7 @@ class BrokerConnection(object): 'node_id': 0, 'request_timeout_ms': 30000, 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max_ms': 1000, + 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5, 'receive_buffer_bytes': None, 'send_buffer_bytes': None, diff --git a/kafka/consumer/group.py b/kafka/consumer/group.py index a1d1dfa37..2d7571d1b 100644 --- a/kafka/consumer/group.py +++ b/kafka/consumer/group.py @@ -98,7 +98,7 @@ class KafkaConsumer(six.Iterator): reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between - 20% below and 20% above the computed value. Default: 1000. + 20% below and 20% above the computed value. Default: 30000. max_in_flight_requests_per_connection (int): Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Default: 5. @@ -263,7 +263,7 @@ class KafkaConsumer(six.Iterator): 'request_timeout_ms': 305000, # chosen to be higher than the default of max_poll_interval_ms 'retry_backoff_ms': 100, 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max_ms': 1000, + 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5, 'auto_offset_reset': 'latest', 'enable_auto_commit': True, diff --git a/kafka/producer/kafka.py b/kafka/producer/kafka.py index dd1cc508c..eb6e91961 100644 --- a/kafka/producer/kafka.py +++ b/kafka/producer/kafka.py @@ -216,7 +216,7 @@ class KafkaProducer(object): reconnection attempts will continue periodically with this fixed rate. To avoid connection storms, a randomization factor of 0.2 will be applied to the backoff resulting in a random range between - 20% below and 20% above the computed value. Default: 1000. + 20% below and 20% above the computed value. Default: 30000. max_in_flight_requests_per_connection (int): Requests are pipelined to kafka brokers up to this number of maximum requests per broker connection. Note that if this setting is set to be greater @@ -311,7 +311,7 @@ class KafkaProducer(object): 'sock_chunk_bytes': 4096, # undocumented experimental option 'sock_chunk_buffer_count': 1000, # undocumented experimental option 'reconnect_backoff_ms': 50, - 'reconnect_backoff_max_ms': 1000, + 'reconnect_backoff_max_ms': 30000, 'max_in_flight_requests_per_connection': 5, 'security_protocol': 'PLAINTEXT', 'ssl_context': None,