From 7efd4a71ccb0522d7456535fca45f9ccd7fe4ec5 Mon Sep 17 00:00:00 2001 From: Shahar Lev Date: Sun, 21 May 2023 11:47:30 +0300 Subject: [PATCH 1/2] Fix dead weakref in sentinel conn (#2767) --- CHANGES | 1 + redis/sentinel.py | 88 ++++++++++++++++++++++++++++++------------ tests/test_sentinel.py | 9 +++++ 3 files changed, 73 insertions(+), 25 deletions(-) diff --git a/CHANGES b/CHANGES index 1e03453cc3..89983d9fa8 100644 --- a/CHANGES +++ b/CHANGES @@ -1,3 +1,4 @@ + * Fix dead weakref in sentinel conn (#2767) * Fix #2754, adding a missing argument to SentinelManagedConnection * Fix `xadd` command to accept non-negative `maxlen` including 0 * Revert #2104, #2673, add `disconnect_on_error` option to `read_response()` (issues #2506, #2624) diff --git a/redis/sentinel.py b/redis/sentinel.py index f9f8f1c3ce..0ba179b9ca 100644 --- a/redis/sentinel.py +++ b/redis/sentinel.py @@ -78,6 +78,54 @@ class SentinelManagedSSLConnection(SentinelManagedConnection, SSLConnection): pass +class SentinelConnectionPoolProxy: + def __init__( + self, + connection_pool, + is_master, + check_connection, + service_name, + sentinel_manager, + ): + self.connection_pool_ref = weakref.ref(connection_pool) + self.is_master = is_master + self.check_connection = check_connection + self.service_name = service_name + self.sentinel_manager = sentinel_manager + self.reset() + + def reset(self): + self.master_address = None + self.slave_rr_counter = None + + def get_master_address(self): + master_address = self.sentinel_manager.discover_master(self.service_name) + if self.is_master and self.master_address != master_address: + self.master_address = master_address + # disconnect any idle connections so that they reconnect + # to the new master the next time that they are used. + connection_pool = self.connection_pool_ref() + if connection_pool is not None: + connection_pool.disconnect(inuse_connections=False) + return master_address + + def rotate_slaves(self): + slaves = self.sentinel_manager.discover_slaves(self.service_name) + if slaves: + if self.slave_rr_counter is None: + self.slave_rr_counter = random.randint(0, len(slaves) - 1) + for _ in range(len(slaves)): + self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) + slave = slaves[self.slave_rr_counter] + yield slave + # Fallback to the master connection + try: + yield self.get_master_address() + except MasterNotFoundError: + pass + raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") + + class SentinelConnectionPool(ConnectionPool): """ Sentinel backed connection pool. @@ -95,8 +143,15 @@ def __init__(self, service_name, sentinel_manager, **kwargs): ) self.is_master = kwargs.pop("is_master", True) self.check_connection = kwargs.pop("check_connection", False) + self.proxy = SentinelConnectionPoolProxy( + connection_pool=self, + is_master=self.is_master, + check_connection=self.check_connection, + service_name=service_name, + sentinel_manager=sentinel_manager, + ) super().__init__(**kwargs) - self.connection_kwargs["connection_pool"] = weakref.proxy(self) + self.connection_kwargs["connection_pool"] = self.proxy self.service_name = service_name self.sentinel_manager = sentinel_manager @@ -106,8 +161,11 @@ def __repr__(self): def reset(self): super().reset() - self.master_address = None - self.slave_rr_counter = None + self.proxy.reset() + + @property + def master_address(self): + return self.proxy.master_address def owns_connection(self, connection): check = not self.is_master or ( @@ -117,31 +175,11 @@ def owns_connection(self, connection): return check and parent.owns_connection(connection) def get_master_address(self): - master_address = self.sentinel_manager.discover_master(self.service_name) - if self.is_master: - if self.master_address != master_address: - self.master_address = master_address - # disconnect any idle connections so that they reconnect - # to the new master the next time that they are used. - self.disconnect(inuse_connections=False) - return master_address + return self.proxy.get_master_address() def rotate_slaves(self): "Round-robin slave balancer" - slaves = self.sentinel_manager.discover_slaves(self.service_name) - if slaves: - if self.slave_rr_counter is None: - self.slave_rr_counter = random.randint(0, len(slaves) - 1) - for _ in range(len(slaves)): - self.slave_rr_counter = (self.slave_rr_counter + 1) % len(slaves) - slave = slaves[self.slave_rr_counter] - yield slave - # Fallback to the master connection - try: - yield self.get_master_address() - except MasterNotFoundError: - pass - raise SlaveNotFoundError(f"No slave found for {self.service_name!r}") + return self.proxy.rotate_slaves() class Sentinel(SentinelCommands): diff --git a/tests/test_sentinel.py b/tests/test_sentinel.py index 8542a0bfc3..e5e3d26fa7 100644 --- a/tests/test_sentinel.py +++ b/tests/test_sentinel.py @@ -98,6 +98,15 @@ def test_discover_master_error(sentinel): sentinel.discover_master("xxx") +@pytest.mark.onlynoncluster +def test_dead_pool(sentinel): + master = sentinel.master_for("mymaster", db=9) + conn = master.connection_pool.get_connection("_") + conn.disconnect() + del master + conn.connect() + + @pytest.mark.onlynoncluster def test_discover_master_sentinel_down(cluster, sentinel, master_ip): # Put first sentinel 'foo' down From 5ece278773b4622beca12da2d3b61a1bb21b843f Mon Sep 17 00:00:00 2001 From: Igor Malinovskiy Date: Thu, 15 Jun 2023 11:39:58 +0200 Subject: [PATCH 2/2] Update CHANGES --- CHANGES | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/CHANGES b/CHANGES index 204c65bd37..7dab1f19d9 100644 --- a/CHANGES +++ b/CHANGES @@ -1,4 +1,4 @@ - * Fix dead weakref in sentinel conn (#2767) + * Fix dead weakref in sentinel connection causing ReferenceError (#2767) * Fix #2749, remove unnecessary __del__ logic to close connections. * Fix #2754, adding a missing argument to SentinelManagedConnection * Fix `xadd` command to accept non-negative `maxlen` including 0