diff --git a/CHANGES b/CHANGES index 2abf62766f..d2033ca085 100644 --- a/CHANGES +++ b/CHANGES @@ -16,6 +16,8 @@ * Added dynaminc_startup_nodes configuration to RedisCluster * Fix reusing the old nodes' connections when cluster topology refresh is being done * Fix RedisCluster to immediately raise AuthenticationError without a retry + * ClusterPipeline Doesn't Handle ConnectionError for Dead Hosts (#2225) + * 4.1.3 (Feb 8, 2022) * Fix flushdb and flushall (#1926) * Add redis5 and redis4 dockers (#1871) diff --git a/redis/cluster.py b/redis/cluster.py index f5844fd62a..cee578b075 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1897,34 +1897,53 @@ def _send_cluster_commands( # we figure out the slot number that command maps to, then from # the slot determine the node. for c in attempt: - # refer to our internal node -> slot table that - # tells us where a given - # command should route to. - passed_targets = c.options.pop("target_nodes", None) - if passed_targets and not self._is_nodes_flag(passed_targets): - target_nodes = self._parse_target_nodes(passed_targets) - else: - target_nodes = self._determine_nodes(*c.args, node_flag=passed_targets) - if not target_nodes: + connection_error_retry_counter = 0 + while True: + # refer to our internal node -> slot table that + # tells us where a given command should route to. + # (it might be possible we have a cached node that no longer + # exists in the cluster, which is why we do this in a loop) + passed_targets = c.options.pop("target_nodes", None) + if passed_targets and not self._is_nodes_flag(passed_targets): + target_nodes = self._parse_target_nodes(passed_targets) + else: + target_nodes = self._determine_nodes( + *c.args, node_flag=passed_targets + ) + if not target_nodes: + raise RedisClusterException( + f"No targets were found to execute {c.args} command on" + ) + if len(target_nodes) > 1: raise RedisClusterException( - f"No targets were found to execute {c.args} command on" + f"Too many targets for command {c.args}" ) - if len(target_nodes) > 1: - raise RedisClusterException(f"Too many targets for command {c.args}") - - node = target_nodes[0] - # now that we know the name of the node - # ( it's just a string in the form of host:port ) - # we can build a list of commands for each node. - node_name = node.name - if node_name not in nodes: - redis_node = self.get_redis_connection(node) - connection = get_connection(redis_node, c.args) - nodes[node_name] = NodeCommands( - redis_node.parse_response, redis_node.connection_pool, connection - ) - nodes[node_name].append(c) + node = target_nodes[0] + + # now that we know the name of the node + # ( it's just a string in the form of host:port ) + # we can build a list of commands for each node. + node_name = node.name + if node_name not in nodes: + redis_node = self.get_redis_connection(node) + try: + connection = get_connection(redis_node, c.args) + except ConnectionError: + connection_error_retry_counter += 1 + if connection_error_retry_counter < 5: + # reinitialize the node -> slot table + self.nodes_manager.initialize() + continue + else: + raise + nodes[node_name] = NodeCommands( + redis_node.parse_response, + redis_node.connection_pool, + connection, + ) + nodes[node_name].append(c) + break # send the commands in sequence. # we write to all the open sockets for each node first,