Skip to content

Commit 22506e2

Browse files
async_cluster: add concurrent test & use read_response/_update_moved_slots without lock
1 parent 4344145 commit 22506e2

File tree

3 files changed

+70
-30
lines changed

3 files changed

+70
-30
lines changed

redis/asyncio/cluster.py

+9-12
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,6 @@
22
import collections
33
import random
44
import socket
5-
import threading
65
import warnings
76
from typing import Any, Callable, Dict, List, Optional, Type, TypeVar, Union
87

@@ -339,7 +338,7 @@ async def on_connect(self, connection: Connection) -> None:
339338
# regardless of the server type. If this is a primary connection,
340339
# READONLY would not affect executing write commands.
341340
await connection.send_command("READONLY")
342-
if str_if_bytes(await connection.read_response()) != "OK":
341+
if str_if_bytes(await connection.read_response_without_lock()) != "OK":
343342
raise ConnectionError("READONLY command failed")
344343

345344
def get_node(
@@ -789,10 +788,10 @@ async def execute_command(self, *args, **kwargs) -> Any:
789788
connection = None
790789
if self._free:
791790
for _ in range(len(self._free)):
792-
if self._free[0].is_connected:
793-
connection = self._free.popleft()
791+
connection = self._free.popleft()
792+
if connection.is_connected:
794793
break
795-
self._free.rotate(-1)
794+
self._free.append(connection)
796795
else:
797796
connection = self._free.popleft()
798797
else:
@@ -807,9 +806,11 @@ async def execute_command(self, *args, **kwargs) -> Any:
807806
await connection.send_packed_command(command, False)
808807
try:
809808
if NEVER_DECODE in kwargs:
810-
response = await connection.read_response(disable_decoding=True)
809+
response = await connection.read_response_without_lock(
810+
disable_decoding=True
811+
)
811812
else:
812-
response = await connection.read_response()
813+
response = await connection.read_response_without_lock()
813814
except ResponseError:
814815
if EMPTY_RESPONSE in kwargs:
815816
return kwargs[EMPTY_RESPONSE]
@@ -827,7 +828,6 @@ async def execute_command(self, *args, **kwargs) -> Any:
827828

828829
class NodesManager:
829830
__slots__ = (
830-
"_lock",
831831
"_moved_exception",
832832
"_require_full_coverage",
833833
"connection_kwargs",
@@ -852,7 +852,6 @@ def __init__(
852852
self._moved_exception = None
853853
self.connection_kwargs = kwargs
854854
self.read_load_balancer = LoadBalancer()
855-
self._lock = threading.Lock()
856855

857856
def get_node(
858857
self,
@@ -936,9 +935,7 @@ def get_node_from_slot(
936935
self, slot: int, read_from_replicas: bool = False
937936
) -> "ClusterNode":
938937
if self._moved_exception:
939-
with self._lock:
940-
if self._moved_exception:
941-
self._update_moved_slots()
938+
self._update_moved_slots()
942939

943940
try:
944941
if read_from_replicas:

redis/asyncio/connection.py

+35
Original file line numberDiff line numberDiff line change
@@ -939,6 +939,41 @@ async def read_response(self, disable_decoding: bool = False):
939939
raise response from None
940940
return response
941941

942+
async def read_response_without_lock(self, disable_decoding: bool = False):
943+
"""Read the response from a previously sent command"""
944+
try:
945+
if self.socket_timeout:
946+
async with async_timeout.timeout(self.socket_timeout):
947+
response = await self._parser.read_response(
948+
disable_decoding=disable_decoding
949+
)
950+
else:
951+
response = await self._parser.read_response(
952+
disable_decoding=disable_decoding
953+
)
954+
except asyncio.TimeoutError:
955+
await self.disconnect()
956+
raise TimeoutError(f"Timeout reading from {self.host}:{self.port}")
957+
except OSError as e:
958+
await self.disconnect()
959+
raise ConnectionError(
960+
f"Error while reading from {self.host}:{self.port} : {e.args}"
961+
)
962+
except BaseException:
963+
await self.disconnect()
964+
raise
965+
966+
if self.health_check_interval:
967+
if sys.version_info[0:2] == (3, 6):
968+
func = asyncio.get_event_loop
969+
else:
970+
func = asyncio.get_running_loop
971+
self.next_health_check = func().time() + self.health_check_interval
972+
973+
if isinstance(response, ResponseError):
974+
raise response from None
975+
return response
976+
942977
def pack_command(self, *args: EncodableT) -> List[bytes]:
943978
"""Pack a series of arguments into the Redis protocol"""
944979
output = []

tests/test_asyncio/test_cluster.py

+26-18
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ def mock_node_resp(
134134
) -> ClusterNode:
135135
connection = mock.AsyncMock()
136136
connection.is_connected = True
137-
connection.read_response.return_value = response
137+
connection.read_response_without_lock.return_value = response
138138
while node._free:
139139
node._free.pop()
140140
node._free.append(connection)
@@ -288,10 +288,10 @@ async def test_execute_command_node_flag_primaries(self, r: RedisCluster) -> Non
288288
assert await r.ping(target_nodes=RedisCluster.PRIMARIES) is True
289289
for primary in primaries:
290290
conn = primary._free.pop()
291-
assert conn.read_response.called is True
291+
assert conn.read_response_without_lock.called is True
292292
for replica in replicas:
293293
conn = replica._free.pop()
294-
assert conn.read_response.called is not True
294+
assert conn.read_response_without_lock.called is not True
295295

296296
async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None:
297297
"""
@@ -305,10 +305,10 @@ async def test_execute_command_node_flag_replicas(self, r: RedisCluster) -> None
305305
assert await r.ping(target_nodes=RedisCluster.REPLICAS) is True
306306
for replica in replicas:
307307
conn = replica._free.pop()
308-
assert conn.read_response.called is True
308+
assert conn.read_response_without_lock.called is True
309309
for primary in primaries:
310310
conn = primary._free.pop()
311-
assert conn.read_response.called is not True
311+
assert conn.read_response_without_lock.called is not True
312312

313313
await r.close()
314314

@@ -320,7 +320,7 @@ async def test_execute_command_node_flag_all_nodes(self, r: RedisCluster) -> Non
320320
assert await r.ping(target_nodes=RedisCluster.ALL_NODES) is True
321321
for node in r.get_nodes():
322322
conn = node._free.pop()
323-
assert conn.read_response.called is True
323+
assert conn.read_response_without_lock.called is True
324324

325325
async def test_execute_command_node_flag_random(self, r: RedisCluster) -> None:
326326
"""
@@ -331,7 +331,7 @@ async def test_execute_command_node_flag_random(self, r: RedisCluster) -> None:
331331
called_count = 0
332332
for node in r.get_nodes():
333333
conn = node._free.pop()
334-
if conn.read_response.called is True:
334+
if conn.read_response_without_lock.called is True:
335335
called_count += 1
336336
assert called_count == 1
337337

@@ -344,7 +344,7 @@ async def test_execute_command_default_node(self, r: RedisCluster) -> None:
344344
mock_node_resp(def_node, "PONG")
345345
assert await r.ping() is True
346346
conn = def_node._free.pop()
347-
assert conn.read_response.called
347+
assert conn.read_response_without_lock.called
348348

349349
async def test_ask_redirection(self, r: RedisCluster) -> None:
350350
"""
@@ -488,7 +488,7 @@ async def test_reading_from_replicas_in_round_robin(self) -> None:
488488
with mock.patch.multiple(
489489
Connection,
490490
send_command=mock.DEFAULT,
491-
read_response=mock.DEFAULT,
491+
read_response_without_lock=mock.DEFAULT,
492492
_connect=mock.DEFAULT,
493493
can_read=mock.DEFAULT,
494494
on_connect=mock.DEFAULT,
@@ -520,7 +520,7 @@ def execute_command_mock_third(self, *args, **options):
520520
# so we'll mock some of the Connection's functions to allow it
521521
execute_command.side_effect = execute_command_mock_first
522522
mocks["send_command"].return_value = True
523-
mocks["read_response"].return_value = "OK"
523+
mocks["read_response_without_lock"].return_value = "OK"
524524
mocks["_connect"].return_value = True
525525
mocks["can_read"].return_value = False
526526
mocks["on_connect"].return_value = True
@@ -682,6 +682,14 @@ async def test_not_require_full_coverage_cluster_down_error(
682682
else:
683683
raise e
684684

685+
async def test_can_run_concurrent_commands(self, r: RedisCluster) -> None:
686+
assert await r.ping(target_nodes=RedisCluster.ALL_NODES) is True
687+
assert all(
688+
await asyncio.gather(
689+
*(r.ping(target_nodes=RedisCluster.ALL_NODES) for _ in range(100))
690+
)
691+
)
692+
685693

686694
@pytest.mark.onlycluster
687695
class TestClusterRedisCommands:
@@ -792,7 +800,7 @@ async def test_cluster_addslots(self, r: RedisCluster) -> None:
792800

793801
@skip_if_server_version_lt("7.0.0")
794802
@skip_if_redis_enterprise()
795-
async def test_cluster_addslotsrange(self, r):
803+
async def test_cluster_addslotsrange(self, r: RedisCluster):
796804
node = r.get_random_node()
797805
mock_node_resp(node, "OK")
798806
assert await r.cluster_addslotsrange(node, 1, 5)
@@ -820,14 +828,14 @@ async def test_cluster_delslots(self) -> None:
820828
node0 = r.get_node(default_host, 7000)
821829
node1 = r.get_node(default_host, 7001)
822830
assert await r.cluster_delslots(0, 8192) == [True, True]
823-
assert node0._free.pop().read_response.called
824-
assert node1._free.pop().read_response.called
831+
assert node0._free.pop().read_response_without_lock.called
832+
assert node1._free.pop().read_response_without_lock.called
825833

826834
await r.close()
827835

828836
@skip_if_server_version_lt("7.0.0")
829837
@skip_if_redis_enterprise()
830-
async def test_cluster_delslotsrange(self, r):
838+
async def test_cluster_delslotsrange(self, r: RedisCluster):
831839
node = r.get_random_node()
832840
mock_node_resp(node, "OK")
833841
await r.cluster_addslots(node, 1, 2, 3, 4, 5)
@@ -990,7 +998,7 @@ async def test_cluster_setslot_stable(self, r: RedisCluster) -> None:
990998
node = r.nodes_manager.get_node_from_slot(12182)
991999
mock_node_resp(node, "OK")
9921000
assert await r.cluster_setslot_stable(12182) is True
993-
assert node._free.pop().read_response.called
1001+
assert node._free.pop().read_response_without_lock.called
9941002

9951003
@skip_if_redis_enterprise()
9961004
async def test_cluster_replicas(self, r: RedisCluster) -> None:
@@ -1014,7 +1022,7 @@ async def test_cluster_replicas(self, r: RedisCluster) -> None:
10141022
)
10151023

10161024
@skip_if_server_version_lt("7.0.0")
1017-
async def test_cluster_links(self, r):
1025+
async def test_cluster_links(self, r: RedisCluster):
10181026
node = r.get_random_node()
10191027
res = await r.cluster_links(node)
10201028
links_to = sum(x.count("to") for x in res)
@@ -1032,7 +1040,7 @@ async def test_readonly(self) -> None:
10321040
for res in all_replicas_results.values():
10331041
assert res is True
10341042
for replica in r.get_replicas():
1035-
assert replica._free.pop().read_response.called
1043+
assert replica._free.pop().read_response_without_lock.called
10361044

10371045
await r.close()
10381046

@@ -1045,7 +1053,7 @@ async def test_readwrite(self) -> None:
10451053
for res in all_replicas_results.values():
10461054
assert res is True
10471055
for replica in r.get_replicas():
1048-
assert replica._free.pop().read_response.called
1056+
assert replica._free.pop().read_response_without_lock.called
10491057

10501058
await r.close()
10511059

0 commit comments

Comments
 (0)