From 8d1792074b70e66dedbf0aec45705e90877cf7c0 Mon Sep 17 00:00:00 2001 From: "zach.lee" Date: Mon, 21 Aug 2023 23:39:40 +0900 Subject: [PATCH 1/3] add is_supported_error() to retry --- redis/cluster.py | 6 ++---- redis/retry.py | 3 +++ tests/test_retry.py | 12 ++++++++++++ 3 files changed, 17 insertions(+), 4 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index d3bfdb3e3d..d7a30cce9a 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1103,7 +1103,7 @@ def execute_command(self, *args, **kwargs): # The nodes and slots cache were reinitialized. # Try again with the new cluster setup. retry_attempts -= 1 - if self.retry and isinstance(e, self.retry._supported_errors): + if self.retry and self.retry.is_supported_error(e): backoff = self.retry._backoff.compute( self.cluster_error_retry_attempts - retry_attempts ) @@ -2039,9 +2039,7 @@ def _send_cluster_commands( n.connection_pool.release(n.connection) n.connection = None nodes = {} - if self.retry and isinstance( - e, self.retry._supported_errors - ): + if self.retry and self.retry.is_supported_error(e): backoff = self.retry._backoff.compute(attempts_count) if backoff > 0: time.sleep(backoff) diff --git a/redis/retry.py b/redis/retry.py index 606443053e..59e9d17998 100644 --- a/redis/retry.py +++ b/redis/retry.py @@ -32,6 +32,9 @@ def update_supported_errors(self, specified_errors: list): set(self._supported_errors + tuple(specified_errors)) ) + def is_supported_error(self, error): + return isinstance(error, self._supported_errors) + def call_with_retry(self, do, fail): """ Execute an operation that might fail and returns its result, or diff --git a/tests/test_retry.py b/tests/test_retry.py index 3cfea5c09e..9e23a29b41 100644 --- a/tests/test_retry.py +++ b/tests/test_retry.py @@ -9,6 +9,8 @@ BusyLoadingError, ConnectionError, ReadOnlyError, + RedisClusterException, + RedisError, TimeoutError, ) from redis.retry import Retry @@ -122,6 +124,16 @@ def test_infinite_retry(self): assert self.actual_attempts == 5 assert self.actual_failures == 5 + @pytest.mark.parametrize("exception_class", [ConnectionError, TimeoutError]) + def test_is_supported_error_true(self, exception_class): + retry = Retry(BackoffMock(), -1) + assert retry.is_supported_error(exception_class()) + + @pytest.mark.parametrize("exception_class", [RedisClusterException, RedisError]) + def test_is_supported_error_false(self, exception_class): + retry = Retry(BackoffMock(), -1) + assert not retry.is_supported_error(exception_class()) + @pytest.mark.onlynoncluster class TestRedisClientRetry: From 71677a5a4ac1c42e80bb3a786dcd74306e27460d Mon Sep 17 00:00:00 2001 From: "zach.lee" Date: Mon, 21 Aug 2023 23:42:12 +0900 Subject: [PATCH 2/3] release, instead of disconnect on any error, when fetching connections in cluster pipeline --- redis/cluster.py | 9 ++++---- tests/test_cluster.py | 49 +++++++++++++++++++++++++++++++++++++++---- 2 files changed, 50 insertions(+), 8 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index d7a30cce9a..40e1546abc 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -2034,7 +2034,7 @@ def _send_cluster_commands( redis_node = self.get_redis_connection(node) try: connection = get_connection(redis_node, c.args) - except (ConnectionError, TimeoutError) as e: + except BaseException as e: for n in nodes.values(): n.connection_pool.release(n.connection) n.connection = None @@ -2043,9 +2043,10 @@ def _send_cluster_commands( backoff = self.retry._backoff.compute(attempts_count) if backoff > 0: time.sleep(backoff) - self.nodes_manager.initialize() - if is_default_node: - self.replace_default_node() + if isinstance(e, (ConnectionError, TimeoutError)): + self.nodes_manager.initialize() + if is_default_node: + self.replace_default_node() raise nodes[node_name] = NodeCommands( redis_node.parse_response, diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 70dc509f2f..625f194911 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -2805,8 +2805,10 @@ def raise_error(): m.side_effect = raise_error - with pytest.raises(Exception, match="unexpected error"): - r.pipeline().get("a").execute() + with patch.object(Connection, "disconnect") as d: + with pytest.raises(Exception, match="unexpected error"): + r.pipeline().get("a").execute() + assert d.call_count == 1 for cluster_node in r.nodes_manager.nodes_cache.values(): connection_pool = cluster_node.redis_connection.connection_pool @@ -3127,7 +3129,7 @@ def raise_ask_error(): assert res == ["MOCK_OK"] @pytest.mark.parametrize("error", [ConnectionError, TimeoutError]) - def test_return_previous_acquired_connections(self, r, error): + def test_return_previous_acquired_connections_with_retry(self, r, error): # in order to ensure that a pipeline will make use of connections # from different nodes assert r.keyslot("a") != r.keyslot("b") @@ -3143,7 +3145,13 @@ def raise_error(target_node, *args, **kwargs): get_connection.side_effect = raise_error - r.pipeline().get("a").get("b").execute() + with patch.object(NodesManager, "initialize") as i: + # in order to remove disconnect caused by initialize + i.side_effect = lambda: None + + with patch.object(Connection, "disconnect") as d: + r.pipeline().get("a").get("b").execute() + assert d.call_count == 0 # there should have been two get_connections per execution and # two executions due to exception raised in the first execution @@ -3153,6 +3161,39 @@ def raise_error(target_node, *args, **kwargs): num_of_conns = len(connection_pool._available_connections) assert num_of_conns == connection_pool._created_connections + @pytest.mark.parametrize("error", [RedisClusterException, BaseException]) + def test_return_previous_acquired_connections_without_retry(self, r, error): + # in order to ensure that a pipeline will make use of connections + # from different nodes + assert r.keyslot("a") != r.keyslot("b") + + orig_func = redis.cluster.get_connection + with patch("redis.cluster.get_connection") as get_connection: + + def raise_error(target_node, *args, **kwargs): + if get_connection.call_count == 2: + raise error("mocked error") + else: + return orig_func(target_node, *args, **kwargs) + + get_connection.side_effect = raise_error + + with patch.object(Connection, "disconnect") as d: + with pytest.raises(error): + r.pipeline().get("a").get("b").execute() + assert d.call_count == 0 + + # there should have been two get_connections per execution and + # two executions due to exception raised in the first execution + assert get_connection.call_count == 2 + for cluster_node in r.nodes_manager.nodes_cache.values(): + connection_pool = cluster_node.redis_connection.connection_pool + num_of_conns = len(connection_pool._available_connections) + assert num_of_conns == connection_pool._created_connections + # connection must remain connected + for conn in connection_pool._available_connections: + assert conn._sock is not None + def test_empty_stack(self, r): """ If pipeline is executed with no commands it should From fa3864ffb31b780b0c200076e4e53360c8b0bf46 Mon Sep 17 00:00:00 2001 From: "zach.lee" Date: Tue, 22 Aug 2023 01:09:16 +0900 Subject: [PATCH 3/3] add a default backoff after cluster pipeline disconnects its connections --- redis/cluster.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/redis/cluster.py b/redis/cluster.py index 40e1546abc..483b8ac6d2 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -2162,6 +2162,8 @@ def _send_cluster_commands( if n.connection: n.connection.disconnect() n.connection_pool.release(n.connection) + if len(nodes) > 0: + time.sleep(0.25) raise def _fail_on_redirect(self, allow_redirections):