Skip to content

Commit b7ffec0

Browse files
authored
Improved RedisCluster's reinitialize_steps and documentation (#1765)
1 parent c2d4621 commit b7ffec0

File tree

2 files changed

+48
-8
lines changed

2 files changed

+48
-8
lines changed

redis/cluster.py

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -414,14 +414,25 @@ def __init__(
414414
stale data.
415415
When set to true, read commands will be assigned between the
416416
primary and its replications in a Round-Robin manner.
417-
:cluster_error_retry_attempts: 'int'
417+
:cluster_error_retry_attempts: 'int'
418418
Retry command execution attempts when encountering ClusterDownError
419419
or ConnectionError
420-
:retry_on_timeout: 'bool'
420+
:retry_on_timeout: 'bool'
421421
To specify a retry policy, first set `retry_on_timeout` to `True`
422422
then set `retry` to a valid `Retry` object
423-
:retry: 'Retry'
423+
:retry: 'Retry'
424424
a `Retry` object
425+
:reinitialize_steps: 'int'
426+
Specifies the number of MOVED errors that need to occur before
427+
reinitializing the whole cluster topology. If a MOVED error occurs
428+
and the cluster does not need to be reinitialized on this current
429+
error handling, only the MOVED slot will be patched with the
430+
redirected node.
431+
To reinitialize the cluster on every MOVED error, set
432+
reinitialize_steps to 1.
433+
To avoid reinitializing the cluster on moved errors, set
434+
reinitialize_steps to 0.
435+
425436
:**kwargs:
426437
Extra arguments that will be sent into Redis instance when created
427438
(See Official redis-py doc for supported kwargs
@@ -727,7 +738,9 @@ def _determine_nodes(self, *args, **kwargs):
727738
return [node]
728739

729740
def _should_reinitialized(self):
730-
# In order not to reinitialize the cluster, the user can set
741+
# To reinitialize the cluster on every MOVED error,
742+
# set reinitialize_steps to 1.
743+
# To avoid reinitializing the cluster on moved errors, set
731744
# reinitialize_steps to 0.
732745
if self.reinitialize_steps == 0:
733746
return False
@@ -958,8 +971,8 @@ def _execute_command(self, target_node, *args, **kwargs):
958971
# redirected node output and try again. If MovedError exceeds
959972
# 'reinitialize_steps' number of times, we will force
960973
# reinitializing the tables, and then try again.
961-
# 'reinitialize_steps' counter will increase faster when the
962-
# same client object is shared between multiple threads. To
974+
# 'reinitialize_steps' counter will increase faster when
975+
# the same client object is shared between multiple threads. To
963976
# reduce the frequency you can set this variable in the
964977
# RedisCluster constructor.
965978
log.exception("MovedError")
@@ -1055,6 +1068,10 @@ def __repr__(self):
10551068
def __eq__(self, obj):
10561069
return isinstance(obj, ClusterNode) and obj.name == self.name
10571070

1071+
def __del__(self):
1072+
if self.redis_connection is not None:
1073+
self.redis_connection.close()
1074+
10581075

10591076
class LoadBalancer:
10601077
"""
@@ -1300,6 +1317,11 @@ def initialize(self):
13001317
startup_node.host, startup_node.port, **copy_kwargs
13011318
)
13021319
self.startup_nodes[startup_node.name].redis_connection = r
1320+
# Make sure cluster mode is enabled on this node
1321+
if bool(r.info().get("cluster_enabled")) is False:
1322+
raise RedisClusterException(
1323+
"Cluster mode is not enabled on this node"
1324+
)
13031325
cluster_slots = r.execute_command("CLUSTER SLOTS")
13041326
startup_nodes_reachable = True
13051327
except (ConnectionError, TimeoutError) as e:
@@ -1327,7 +1349,7 @@ def initialize(self):
13271349
message = e.__str__()
13281350
raise RedisClusterException(
13291351
'ERROR sending "cluster slots" command to redis '
1330-
f"server: {startup_node}. error: {message}"
1352+
f"server {startup_node.name}. error: {message}"
13311353
)
13321354

13331355
# CLUSTER SLOTS command results in the following output:

tests/test_cluster.py

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -84,6 +84,7 @@ def get_mocked_redis_client(func=None, *args, **kwargs):
8484
"""
8585
cluster_slots = kwargs.pop("cluster_slots", default_cluster_slots)
8686
coverage_res = kwargs.pop("coverage_result", "yes")
87+
cluster_enabled = kwargs.pop("cluster_enabled", True)
8788
with patch.object(Redis, "execute_command") as execute_command_mock:
8889

8990
def execute_command(*_args, **_kwargs):
@@ -92,7 +93,9 @@ def execute_command(*_args, **_kwargs):
9293
return mock_cluster_slots
9394
elif _args[0] == "COMMAND":
9495
return {"get": [], "set": []}
95-
elif _args[1] == "cluster-require-full-coverage":
96+
elif _args[0] == "INFO":
97+
return {"cluster_enabled": cluster_enabled}
98+
elif len(_args) > 1 and _args[1] == "cluster-require-full-coverage":
9699
return {"cluster-require-full-coverage": coverage_res}
97100
elif func is not None:
98101
return func(*args, **kwargs)
@@ -1974,6 +1977,17 @@ def test_init_slots_cache(self):
19741977

19751978
assert len(n_manager.nodes_cache) == 6
19761979

1980+
def test_init_slots_cache_cluster_mode_disabled(self):
1981+
"""
1982+
Test that creating a RedisCluster failes if one of the startup nodes
1983+
has cluster mode disabled
1984+
"""
1985+
with pytest.raises(RedisClusterException) as e:
1986+
get_mocked_redis_client(
1987+
host=default_host, port=default_port, cluster_enabled=False
1988+
)
1989+
assert "Cluster mode is not enabled on this node" in str(e.value)
1990+
19771991
def test_empty_startup_nodes(self):
19781992
"""
19791993
It should not be possible to create a node manager with no nodes
@@ -2044,6 +2058,8 @@ def create_mocked_redis_node(host, port, **kwargs):
20442058
def execute_command(*args, **kwargs):
20452059
if args[0] == "CLUSTER SLOTS":
20462060
return result
2061+
elif args[0] == "INFO":
2062+
return {"cluster_enabled": True}
20472063
elif args[1] == "cluster-require-full-coverage":
20482064
return {"cluster-require-full-coverage": "yes"}
20492065
else:
@@ -2108,6 +2124,8 @@ def execute_command(*args, **kwargs):
21082124
["127.0.0.1", 7002, "node_2"],
21092125
],
21102126
]
2127+
elif args[0] == "INFO":
2128+
return {"cluster_enabled": True}
21112129
elif args[1] == "cluster-require-full-coverage":
21122130
return {"cluster-require-full-coverage": "yes"}
21132131

0 commit comments

Comments
 (0)