@@ -1897,34 +1897,53 @@ def _send_cluster_commands(
1897
1897
# we figure out the slot number that command maps to, then from
1898
1898
# the slot determine the node.
1899
1899
for c in attempt :
1900
- # refer to our internal node -> slot table that
1901
- # tells us where a given
1902
- # command should route to.
1903
- passed_targets = c .options .pop ("target_nodes" , None )
1904
- if passed_targets and not self ._is_nodes_flag (passed_targets ):
1905
- target_nodes = self ._parse_target_nodes (passed_targets )
1906
- else :
1907
- target_nodes = self ._determine_nodes (* c .args , node_flag = passed_targets )
1908
- if not target_nodes :
1900
+ connection_error_retry_counter = 0
1901
+ while True :
1902
+ # refer to our internal node -> slot table that
1903
+ # tells us where a given command should route to.
1904
+ # (it might be possible we have a cached node that no longer
1905
+ # exists in the cluster, which is why we do this in a loop)
1906
+ passed_targets = c .options .pop ("target_nodes" , None )
1907
+ if passed_targets and not self ._is_nodes_flag (passed_targets ):
1908
+ target_nodes = self ._parse_target_nodes (passed_targets )
1909
+ else :
1910
+ target_nodes = self ._determine_nodes (
1911
+ * c .args , node_flag = passed_targets
1912
+ )
1913
+ if not target_nodes :
1914
+ raise RedisClusterException (
1915
+ f"No targets were found to execute { c .args } command on"
1916
+ )
1917
+ if len (target_nodes ) > 1 :
1909
1918
raise RedisClusterException (
1910
- f"No targets were found to execute { c .args } command on "
1919
+ f"Too many targets for command { c .args } "
1911
1920
)
1912
- if len (target_nodes ) > 1 :
1913
- raise RedisClusterException (f"Too many targets for command { c .args } " )
1914
-
1915
- node = target_nodes [0 ]
1916
- # now that we know the name of the node
1917
- # ( it's just a string in the form of host:port )
1918
- # we can build a list of commands for each node.
1919
- node_name = node .name
1920
- if node_name not in nodes :
1921
- redis_node = self .get_redis_connection (node )
1922
- connection = get_connection (redis_node , c .args )
1923
- nodes [node_name ] = NodeCommands (
1924
- redis_node .parse_response , redis_node .connection_pool , connection
1925
- )
1926
1921
1927
- nodes [node_name ].append (c )
1922
+ node = target_nodes [0 ]
1923
+
1924
+ # now that we know the name of the node
1925
+ # ( it's just a string in the form of host:port )
1926
+ # we can build a list of commands for each node.
1927
+ node_name = node .name
1928
+ if node_name not in nodes :
1929
+ redis_node = self .get_redis_connection (node )
1930
+ try :
1931
+ connection = get_connection (redis_node , c .args )
1932
+ except ConnectionError :
1933
+ connection_error_retry_counter += 1
1934
+ if connection_error_retry_counter < 5 :
1935
+ # reinitialize the node -> slot table
1936
+ self .nodes_manager .initialize ()
1937
+ continue
1938
+ else :
1939
+ raise
1940
+ nodes [node_name ] = NodeCommands (
1941
+ redis_node .parse_response ,
1942
+ redis_node .connection_pool ,
1943
+ connection ,
1944
+ )
1945
+ nodes [node_name ].append (c )
1946
+ break
1928
1947
1929
1948
# send the commands in sequence.
1930
1949
# we write to all the open sockets for each node first,
0 commit comments