From 7ef18d4d796b4402c78ae9ed90a57eaaae955315 Mon Sep 17 00:00:00 2001 From: Matt Lee Date: Fri, 13 Jun 2025 10:50:38 +0900 Subject: [PATCH] Support read from replica on clsuter pubsub subscribe --- redis/cluster.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/redis/cluster.py b/redis/cluster.py index 3431fe06e6..4be80ee9d7 100644 --- a/redis/cluster.py +++ b/redis/cluster.py @@ -1646,7 +1646,7 @@ class ClusterPubSub(PubSub): https://redis-py-cluster.readthedocs.io/en/stable/pubsub.html """ - def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs): + def __init__(self, redis_cluster, node=None, host=None, port=None, replica=False, **kwargs): """ When a pubsub instance is created without specifying a node, a single node will be transparently chosen for the pubsub connection on the @@ -1661,6 +1661,7 @@ def __init__(self, redis_cluster, node=None, host=None, port=None, **kwargs): :type port: int """ self.node = None + self.replica = replica self.set_pubsub_node(redis_cluster, node, host, port) connection_pool = ( None @@ -1794,7 +1795,7 @@ def get_sharded_message( if message["channel"] in self.pending_unsubscribe_shard_channels: self.pending_unsubscribe_shard_channels.remove(message["channel"]) self.shard_channels.pop(message["channel"], None) - node = self.cluster.get_node_from_key(message["channel"]) + node = self.cluster.get_node_from_key(message["channel"], self.replica) if self.node_pubsub_mapping[node.name].subscribed is False: self.node_pubsub_mapping.pop(node.name) if not self.channels and not self.patterns and not self.shard_channels: @@ -1811,7 +1812,7 @@ def ssubscribe(self, *args, **kwargs): s_channels = dict.fromkeys(args) s_channels.update(kwargs) for s_channel, handler in s_channels.items(): - node = self.cluster.get_node_from_key(s_channel) + node = self.cluster.get_node_from_key(s_channel, self.replica) pubsub = self._get_node_pubsub(node) if handler: pubsub.ssubscribe(**{s_channel: handler}) @@ -1832,7 +1833,7 @@ def sunsubscribe(self, *args): args = self.shard_channels for s_channel in args: - node = self.cluster.get_node_from_key(s_channel) + node = self.cluster.get_node_from_key(s_channel, self.replica) p = self._get_node_pubsub(node) p.sunsubscribe(s_channel) self.pending_unsubscribe_shard_channels.update(