From 6822a25a06a0264d45bb9f8b94e737bbe863fd3c Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 14 Apr 2023 13:30:27 +0000 Subject: [PATCH 01/10] Use provided redis address. Bind to IPv4 --- tests/test_asyncio/test_cwe_404.py | 40 ++++++++++++++++++++---------- 1 file changed, 27 insertions(+), 13 deletions(-) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index dc62df65f4..192edbc4ef 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -1,10 +1,23 @@ import asyncio import sys +import urllib.parse import pytest from redis.asyncio import Redis from redis.asyncio.cluster import RedisCluster +from redis.asyncio.connection import async_timeout + + +@pytest.fixture +def redis_addr(request): + redis_url = request.config.getoption("--redis-url") + scheme, netloc = urllib.parse.urlparse(redis_url)[:2] + assert scheme == "redis" + if ":" in netloc: + return netloc.split(":") + else: + return netloc, "6379" async def pipe( @@ -26,6 +39,10 @@ def __init__(self, addr, redis_addr, delay: float): self.delay = delay async def start(self): + # test that we can connect to redis + async with async_timeout(2): + _, redis_writer = await asyncio.open_connection(*self.redis_addr) + redis_writer.close() self.server = await asyncio.start_server(self.handle, *self.addr) self.ROUTINE = asyncio.create_task(self.server.serve_forever()) @@ -47,18 +64,16 @@ async def stop(self): @pytest.mark.onlynoncluster @pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) -async def test_standalone(delay): +async def test_standalone(delay, redis_addr): # create a tcp socket proxy that relays data to Redis and back, # inserting 0.1 seconds of delay - dp = DelayProxy( - addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2 - ) + dp = DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr, delay=delay * 2) await dp.start() for b in [True, False]: # note that we connect to proxy, rather than to Redis directly - async with Redis(host="localhost", port=5380, single_connection_client=b) as r: + async with Redis(host="127.0.0.1", port=5380, single_connection_client=b) as r: await r.set("foo", "foo") await r.set("bar", "bar") @@ -83,13 +98,11 @@ async def test_standalone(delay): @pytest.mark.onlynoncluster @pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) -async def test_standalone_pipeline(delay): - dp = DelayProxy( - addr=("localhost", 5380), redis_addr=("localhost", 6379), delay=delay * 2 - ) +async def test_standalone_pipeline(delay, redis_addr): + dp = DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr, delay=delay * 2) await dp.start() for b in [True, False]: - async with Redis(host="localhost", port=5380, single_connection_client=b) as r: + async with Redis(host="127.0.0.1", port=5380, single_connection_client=b) as r: await r.set("foo", "foo") await r.set("bar", "bar") @@ -122,12 +135,13 @@ async def test_standalone_pipeline(delay): @pytest.mark.onlycluster -async def test_cluster(request): +async def test_cluster(request, redis_addr): - dp = DelayProxy(addr=("localhost", 5381), redis_addr=("localhost", 6372), delay=0.1) + redis_addr = redis_addr[0], 6372 # use the cluster port + dp = DelayProxy(addr=("127.0.0.1", 5381), redis_addr=redis_addr, delay=0.1) await dp.start() - r = RedisCluster.from_url("redis://localhost:5381") + r = RedisCluster.from_url("redis://127.0.0.1:5381") await r.initialize() await r.set("foo", "foo") await r.set("bar", "bar") From f093d8559847e1dfca9cf980babad322d3532b61 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 14 Apr 2023 13:32:16 +0000 Subject: [PATCH 02/10] Add missing "await" and perform the correct test for pipe eimpty --- tests/test_asyncio/test_cwe_404.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index 192edbc4ef..471543fce7 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -120,9 +120,10 @@ async def test_standalone_pipeline(delay, redis_addr): pipe.get("bar") pipe.ping() pipe.get("foo") - pipe.reset() + await pipe.reset() - assert await pipe.execute() is None + # check that the pipeline is empty after reset + assert await pipe.execute() == [] # validating that the pipeline can be used as it could previously pipe.get("bar") From 32f730588cc3a0bea10e0ab8c7d687b6b0ed9087 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 14 Apr 2023 13:41:54 +0000 Subject: [PATCH 03/10] Wait for a send event, rather than rely on sleep time. Excpect cancel errors. --- tests/test_asyncio/test_cwe_404.py | 70 ++++++++++++++++++------------ 1 file changed, 42 insertions(+), 28 deletions(-) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index 471543fce7..be7bf4eca3 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -1,5 +1,4 @@ import asyncio -import sys import urllib.parse import pytest @@ -20,23 +19,12 @@ def redis_addr(request): return netloc, "6379" -async def pipe( - reader: asyncio.StreamReader, writer: asyncio.StreamWriter, delay: float, name="" -): - while True: - data = await reader.read(1000) - if not data: - break - await asyncio.sleep(delay) - writer.write(data) - await writer.drain() - - class DelayProxy: def __init__(self, addr, redis_addr, delay: float): self.addr = addr self.redis_addr = redis_addr self.delay = delay + self.send_event = asyncio.Event() async def start(self): # test that we can connect to redis @@ -49,10 +37,10 @@ async def start(self): async def handle(self, reader, writer): # establish connection to redis redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr) - pipe1 = asyncio.create_task(pipe(reader, redis_writer, self.delay, "to redis:")) - pipe2 = asyncio.create_task( - pipe(redis_reader, writer, self.delay, "from redis:") + pipe1 = asyncio.create_task( + self.pipe(reader, redis_writer, "to redis:", self.send_event) ) + pipe2 = asyncio.create_task(self.pipe(redis_reader, writer, "from redis:")) await asyncio.gather(pipe1, pipe2) async def stop(self): @@ -61,6 +49,24 @@ async def stop(self): loop = self.server.get_loop() await loop.shutdown_asyncgens() + async def pipe( + self, + reader: asyncio.StreamReader, + writer: asyncio.StreamWriter, + name="", + event: asyncio.Event = None, + ): + while True: + data = await reader.read(1000) + if not data: + break + # print(f"{name} read {len(data)} delay {self.delay}") + if event: + event.set() + await asyncio.sleep(self.delay) + writer.write(data) + await writer.drain() + @pytest.mark.onlynoncluster @pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) @@ -78,17 +84,18 @@ async def test_standalone(delay, redis_addr): await r.set("foo", "foo") await r.set("bar", "bar") + dp.send_event.clear() t = asyncio.create_task(r.get("foo")) - await asyncio.sleep(delay) + # Wait until the task has sent, and then some, to make sure it has + # settled on the read. + await dp.send_event.wait() + await asyncio.sleep(0.01) # a little extra time for prudence t.cancel() - try: + with pytest.raises(asyncio.CancelledError): await t - sys.stderr.write("try again, we did not cancel the task in time\n") - except asyncio.CancelledError: - sys.stderr.write( - "canceled task, connection is left open with unread response\n" - ) + # make sure that our previous request, cancelled while waiting for + # a repsponse, didn't leave the connection open andin a bad state assert await r.get("bar") == b"bar" assert await r.ping() assert await r.get("foo") == b"foo" @@ -113,10 +120,17 @@ async def test_standalone_pipeline(delay, redis_addr): pipe2.ping() pipe2.get("foo") + dp.send_event.clear() t = asyncio.create_task(pipe.get("foo").execute()) - await asyncio.sleep(delay) + # wait until task has settled on the read + await dp.send_event.wait() + await asyncio.sleep(0.01) t.cancel() + with pytest.raises(asyncio.CancelledError): + await t + # we have now cancelled the pieline in the middle of a request, make sure + # that the connection is still usable pipe.get("bar") pipe.ping() pipe.get("foo") @@ -147,13 +161,13 @@ async def test_cluster(request, redis_addr): await r.set("foo", "foo") await r.set("bar", "bar") + dp.send_event.clear() t = asyncio.create_task(r.get("foo")) - await asyncio.sleep(0.050) + await dp.send_event.wait() + await asyncio.sleep(0.01) t.cancel() - try: + with pytest.raises(asyncio.CancelledError): await t - except asyncio.CancelledError: - pytest.fail("connection is left open with unread response") assert await r.get("bar") == b"bar" assert await r.ping() From 91eeb66ddb10b8f5b0b73a7900ecaf06c43fb8d6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 14 Apr 2023 13:15:29 +0000 Subject: [PATCH 04/10] set delay to 0 except for operation we want to cancel This speeds up the unit tests considerably by eliminating unnecessary delay. --- tests/test_asyncio/test_cwe_404.py | 45 +++++++++++++++++++++++++----- 1 file changed, 38 insertions(+), 7 deletions(-) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index be7bf4eca3..ac69b319cb 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -1,4 +1,5 @@ import asyncio +import contextlib import urllib.parse import pytest @@ -20,7 +21,7 @@ def redis_addr(request): class DelayProxy: - def __init__(self, addr, redis_addr, delay: float): + def __init__(self, addr, redis_addr, delay: float = 0.0): self.addr = addr self.redis_addr = redis_addr self.delay = delay @@ -34,6 +35,19 @@ async def start(self): self.server = await asyncio.start_server(self.handle, *self.addr) self.ROUTINE = asyncio.create_task(self.server.serve_forever()) + @contextlib.contextmanager + def set_delay(self, delay: float = 0.0): + """ + Allow to override the delay for parts of tests which aren't time dependent, + to speed up execution. + """ + old = self.delay + self.delay = delay + try: + yield + finally: + self.delay = old + async def handle(self, reader, writer): # establish connection to redis redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr) @@ -74,7 +88,7 @@ async def test_standalone(delay, redis_addr): # create a tcp socket proxy that relays data to Redis and back, # inserting 0.1 seconds of delay - dp = DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr, delay=delay * 2) + dp = DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr) await dp.start() for b in [True, False]: @@ -84,8 +98,14 @@ async def test_standalone(delay, redis_addr): await r.set("foo", "foo") await r.set("bar", "bar") + async def op(r): + with dp.set_delay(delay * 2): + return await r.get( + "foo" + ) # <-- this is the operation we want to cancel + dp.send_event.clear() - t = asyncio.create_task(r.get("foo")) + t = asyncio.create_task(op(r)) # Wait until the task has sent, and then some, to make sure it has # settled on the read. await dp.send_event.wait() @@ -106,7 +126,7 @@ async def test_standalone(delay, redis_addr): @pytest.mark.onlynoncluster @pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) async def test_standalone_pipeline(delay, redis_addr): - dp = DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr, delay=delay * 2) + dp = DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr) await dp.start() for b in [True, False]: async with Redis(host="127.0.0.1", port=5380, single_connection_client=b) as r: @@ -120,8 +140,14 @@ async def test_standalone_pipeline(delay, redis_addr): pipe2.ping() pipe2.get("foo") + async def op(pipe): + with dp.set_delay(delay * 2): + return await pipe.get( + "foo" + ).execute() # <-- this is the operation we want to cancel + dp.send_event.clear() - t = asyncio.create_task(pipe.get("foo").execute()) + t = asyncio.create_task(op(pipe)) # wait until task has settled on the read await dp.send_event.wait() await asyncio.sleep(0.01) @@ -153,7 +179,8 @@ async def test_standalone_pipeline(delay, redis_addr): async def test_cluster(request, redis_addr): redis_addr = redis_addr[0], 6372 # use the cluster port - dp = DelayProxy(addr=("127.0.0.1", 5381), redis_addr=redis_addr, delay=0.1) + delay = 0.1 + dp = DelayProxy(addr=("127.0.0.1", 5381), redis_addr=redis_addr) await dp.start() r = RedisCluster.from_url("redis://127.0.0.1:5381") @@ -161,8 +188,12 @@ async def test_cluster(request, redis_addr): await r.set("foo", "foo") await r.set("bar", "bar") + async def op(r): + with dp.set_delay(delay): + return await r.get("foo") + dp.send_event.clear() - t = asyncio.create_task(r.get("foo")) + t = asyncio.create_task(op(r)) await dp.send_event.wait() await asyncio.sleep(0.01) t.cancel() From c7e9cd462876c3c8e2a602dd6cce79ddc97ecf32 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 5 Apr 2023 13:51:08 +0000 Subject: [PATCH 05/10] Release resources in test --- tests/test_asyncio/test_cwe_404.py | 61 +++++++++++++++++------------- 1 file changed, 34 insertions(+), 27 deletions(-) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index ac69b319cb..382de003ad 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -41,25 +41,32 @@ def set_delay(self, delay: float = 0.0): Allow to override the delay for parts of tests which aren't time dependent, to speed up execution. """ - old = self.delay + old_delay = self.delay self.delay = delay try: yield finally: - self.delay = old + self.delay = old_delay async def handle(self, reader, writer): # establish connection to redis redis_reader, redis_writer = await asyncio.open_connection(*self.redis_addr) - pipe1 = asyncio.create_task( - self.pipe(reader, redis_writer, "to redis:", self.send_event) - ) - pipe2 = asyncio.create_task(self.pipe(redis_reader, writer, "from redis:")) - await asyncio.gather(pipe1, pipe2) + try: + pipe1 = asyncio.create_task( + self.pipe(reader, redis_writer, "to redis:", self.send_event) + ) + pipe2 = asyncio.create_task(self.pipe(redis_reader, writer, "from redis:")) + await asyncio.gather(pipe1, pipe2) + finally: + redis_writer.close() async def stop(self): # clean up enough so that we can reuse the looper self.ROUTINE.cancel() + try: + await self.ROUTINE + except asyncio.CancelledError: + pass loop = self.server.get_loop() await loop.shutdown_asyncgens() @@ -183,25 +190,25 @@ async def test_cluster(request, redis_addr): dp = DelayProxy(addr=("127.0.0.1", 5381), redis_addr=redis_addr) await dp.start() - r = RedisCluster.from_url("redis://127.0.0.1:5381") - await r.initialize() - await r.set("foo", "foo") - await r.set("bar", "bar") - - async def op(r): - with dp.set_delay(delay): - return await r.get("foo") - - dp.send_event.clear() - t = asyncio.create_task(op(r)) - await dp.send_event.wait() - await asyncio.sleep(0.01) - t.cancel() - with pytest.raises(asyncio.CancelledError): - await t - - assert await r.get("bar") == b"bar" - assert await r.ping() - assert await r.get("foo") == b"foo" + with contextlib.closing(RedisCluster.from_url("redis://127.0.0.1:5381")) as r: + await r.initialize() + await r.set("foo", "foo") + await r.set("bar", "bar") + + async def op(r): + with dp.set_delay(delay): + return await r.get("foo") + + dp.send_event.clear() + t = asyncio.create_task(op(r)) + await dp.send_event.wait() + await asyncio.sleep(0.01) + t.cancel() + with pytest.raises(asyncio.CancelledError): + await t + + assert await r.get("bar") == b"bar" + assert await r.ping() + assert await r.get("foo") == b"foo" await dp.stop() From a1d3ae723ada72e1b0a74db283804cb8f9234727 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Fri, 14 Apr 2023 13:20:30 +0000 Subject: [PATCH 06/10] Fix cluster test to use address_remap and multiple proxies. --- tests/test_asyncio/test_cwe_404.py | 60 ++++++++++++++++++++++++------ 1 file changed, 49 insertions(+), 11 deletions(-) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index 382de003ad..0e33e6556d 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -185,30 +185,68 @@ async def op(pipe): @pytest.mark.onlycluster async def test_cluster(request, redis_addr): - redis_addr = redis_addr[0], 6372 # use the cluster port delay = 0.1 - dp = DelayProxy(addr=("127.0.0.1", 5381), redis_addr=redis_addr) - await dp.start() + cluster_port = 6372 + remap_base = 7372 + n_nodes = 6 + + def remap(address): + host, port = address + return host, remap_base + port - cluster_port + + proxies = [] + for i in range(n_nodes): + port = cluster_port + i + remapped = remap_base + i + forward_addr = redis_addr[0], port + proxy = DelayProxy(addr=("127.0.0.1", remapped), redis_addr=forward_addr) + proxies.append(proxy) + + # start proxies + await asyncio.gather(*[p.start() for p in proxies]) + + def all_clear(): + for p in proxies: + p.send_event.clear() + + async def wait_for_send(): + asyncio.wait( + [p.send_event.wait() for p in proxies], return_when=asyncio.FIRST_COMPLETED + ) - with contextlib.closing(RedisCluster.from_url("redis://127.0.0.1:5381")) as r: + @contextlib.contextmanager + def set_delay(delay: float): + with contextlib.ExitStack() as stack: + for p in proxies: + stack.enter_context(p.set_delay(delay)) + yield + + with contextlib.closing( + RedisCluster.from_url(f"redis://127.0.0.1:{remap_base}", address_remap=remap) + ) as r: await r.initialize() await r.set("foo", "foo") await r.set("bar", "bar") async def op(r): - with dp.set_delay(delay): + with set_delay(delay): return await r.get("foo") - dp.send_event.clear() + all_clear() t = asyncio.create_task(op(r)) - await dp.send_event.wait() + # Wait for whichever DelayProxy gets the request first + await wait_for_send() await asyncio.sleep(0.01) t.cancel() with pytest.raises(asyncio.CancelledError): await t - assert await r.get("bar") == b"bar" - assert await r.ping() - assert await r.get("foo") == b"foo" + # try a number of requests to excercise all the connections + async def doit(): + assert await r.get("bar") == b"bar" + assert await r.ping() + assert await r.get("foo") == b"foo" - await dp.stop() + await asyncio.gather(*[doit() for _ in range(10)]) + + await asyncio.gather(*(p.stop() for p in proxies)) From 00950d54359636e97cbcc3af8acca3cc8af3479d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 3 May 2023 12:53:01 +0000 Subject: [PATCH 07/10] Use context manager to manage DelayProxy --- tests/test_asyncio/test_cwe_404.py | 234 +++++++++++++++-------------- 1 file changed, 121 insertions(+), 113 deletions(-) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index 0e33e6556d..013c6d551a 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -27,12 +27,21 @@ def __init__(self, addr, redis_addr, delay: float = 0.0): self.delay = delay self.send_event = asyncio.Event() + async def __aenter__(self): + await self.start() + return self + + async def __aexit__(self, *args): + await self.stop() + async def start(self): # test that we can connect to redis async with async_timeout(2): _, redis_writer = await asyncio.open_connection(*self.redis_addr) redis_writer.close() - self.server = await asyncio.start_server(self.handle, *self.addr) + self.server = await asyncio.start_server( + self.handle, *self.addr, reuse_address=True + ) self.ROUTINE = asyncio.create_task(self.server.serve_forever()) @contextlib.contextmanager @@ -95,91 +104,89 @@ async def test_standalone(delay, redis_addr): # create a tcp socket proxy that relays data to Redis and back, # inserting 0.1 seconds of delay - dp = DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr) - await dp.start() - - for b in [True, False]: - # note that we connect to proxy, rather than to Redis directly - async with Redis(host="127.0.0.1", port=5380, single_connection_client=b) as r: - - await r.set("foo", "foo") - await r.set("bar", "bar") - - async def op(r): - with dp.set_delay(delay * 2): - return await r.get( - "foo" - ) # <-- this is the operation we want to cancel - - dp.send_event.clear() - t = asyncio.create_task(op(r)) - # Wait until the task has sent, and then some, to make sure it has - # settled on the read. - await dp.send_event.wait() - await asyncio.sleep(0.01) # a little extra time for prudence - t.cancel() - with pytest.raises(asyncio.CancelledError): - await t - - # make sure that our previous request, cancelled while waiting for - # a repsponse, didn't leave the connection open andin a bad state - assert await r.get("bar") == b"bar" - assert await r.ping() - assert await r.get("foo") == b"foo" - - await dp.stop() + async with DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr) as dp: + + for b in [True, False]: + # note that we connect to proxy, rather than to Redis directly + async with Redis( + host="127.0.0.1", port=5380, single_connection_client=b + ) as r: + + await r.set("foo", "foo") + await r.set("bar", "bar") + + async def op(r): + with dp.set_delay(delay * 2): + return await r.get( + "foo" + ) # <-- this is the operation we want to cancel + + dp.send_event.clear() + t = asyncio.create_task(op(r)) + # Wait until the task has sent, and then some, to make sure it has + # settled on the read. + await dp.send_event.wait() + await asyncio.sleep(0.01) # a little extra time for prudence + t.cancel() + with pytest.raises(asyncio.CancelledError): + await t + + # make sure that our previous request, cancelled while waiting for + # a repsponse, didn't leave the connection open andin a bad state + assert await r.get("bar") == b"bar" + assert await r.ping() + assert await r.get("foo") == b"foo" @pytest.mark.onlynoncluster @pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) async def test_standalone_pipeline(delay, redis_addr): - dp = DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr) - await dp.start() - for b in [True, False]: - async with Redis(host="127.0.0.1", port=5380, single_connection_client=b) as r: - await r.set("foo", "foo") - await r.set("bar", "bar") - - pipe = r.pipeline() - - pipe2 = r.pipeline() - pipe2.get("bar") - pipe2.ping() - pipe2.get("foo") - - async def op(pipe): - with dp.set_delay(delay * 2): - return await pipe.get( - "foo" - ).execute() # <-- this is the operation we want to cancel - - dp.send_event.clear() - t = asyncio.create_task(op(pipe)) - # wait until task has settled on the read - await dp.send_event.wait() - await asyncio.sleep(0.01) - t.cancel() - with pytest.raises(asyncio.CancelledError): - await t - - # we have now cancelled the pieline in the middle of a request, make sure - # that the connection is still usable - pipe.get("bar") - pipe.ping() - pipe.get("foo") - await pipe.reset() - - # check that the pipeline is empty after reset - assert await pipe.execute() == [] - - # validating that the pipeline can be used as it could previously - pipe.get("bar") - pipe.ping() - pipe.get("foo") - assert await pipe.execute() == [b"bar", True, b"foo"] - assert await pipe2.execute() == [b"bar", True, b"foo"] - - await dp.stop() + async with DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr) as dp: + for b in [True, False]: + async with Redis( + host="127.0.0.1", port=5380, single_connection_client=b + ) as r: + await r.set("foo", "foo") + await r.set("bar", "bar") + + pipe = r.pipeline() + + pipe2 = r.pipeline() + pipe2.get("bar") + pipe2.ping() + pipe2.get("foo") + + async def op(pipe): + with dp.set_delay(delay * 2): + return await pipe.get( + "foo" + ).execute() # <-- this is the operation we want to cancel + + dp.send_event.clear() + t = asyncio.create_task(op(pipe)) + # wait until task has settled on the read + await dp.send_event.wait() + await asyncio.sleep(0.01) + t.cancel() + with pytest.raises(asyncio.CancelledError): + await t + + # we have now cancelled the pieline in the middle of a request, make sure + # that the connection is still usable + pipe.get("bar") + pipe.ping() + pipe.get("foo") + await pipe.reset() + + # check that the pipeline is empty after reset + assert await pipe.execute() == [] + + # validating that the pipeline can be used as it could previously + pipe.get("bar") + pipe.ping() + pipe.get("foo") + assert await pipe.execute() == [b"bar", True, b"foo"] + assert await pipe2.execute() == [b"bar", True, b"foo"] @pytest.mark.onlycluster @@ -202,9 +209,6 @@ def remap(address): proxy = DelayProxy(addr=("127.0.0.1", remapped), redis_addr=forward_addr) proxies.append(proxy) - # start proxies - await asyncio.gather(*[p.start() for p in proxies]) - def all_clear(): for p in proxies: p.send_event.clear() @@ -221,32 +225,36 @@ def set_delay(delay: float): stack.enter_context(p.set_delay(delay)) yield - with contextlib.closing( - RedisCluster.from_url(f"redis://127.0.0.1:{remap_base}", address_remap=remap) - ) as r: - await r.initialize() - await r.set("foo", "foo") - await r.set("bar", "bar") - - async def op(r): - with set_delay(delay): - return await r.get("foo") - - all_clear() - t = asyncio.create_task(op(r)) - # Wait for whichever DelayProxy gets the request first - await wait_for_send() - await asyncio.sleep(0.01) - t.cancel() - with pytest.raises(asyncio.CancelledError): - await t - - # try a number of requests to excercise all the connections - async def doit(): - assert await r.get("bar") == b"bar" - assert await r.ping() - assert await r.get("foo") == b"foo" - - await asyncio.gather(*[doit() for _ in range(10)]) - - await asyncio.gather(*(p.stop() for p in proxies)) + async with contextlib.AsyncExitStack() as stack: + for p in proxies: + await stack.enter_async_context(p) + + with contextlib.closing( + RedisCluster.from_url( + f"redis://127.0.0.1:{remap_base}", address_remap=remap + ) + ) as r: + await r.initialize() + await r.set("foo", "foo") + await r.set("bar", "bar") + + async def op(r): + with set_delay(delay): + return await r.get("foo") + + all_clear() + t = asyncio.create_task(op(r)) + # Wait for whichever DelayProxy gets the request first + await wait_for_send() + await asyncio.sleep(0.01) + t.cancel() + with pytest.raises(asyncio.CancelledError): + await t + + # try a number of requests to excercise all the connections + async def doit(): + assert await r.get("bar") == b"bar" + assert await r.ping() + assert await r.get("foo") == b"foo" + + await asyncio.gather(*[doit() for _ in range(10)]) From 27cc5a701cbe5098f7780b5e20b13b8f34fc3775 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 27 Apr 2023 09:46:12 +0000 Subject: [PATCH 08/10] Mark failing pipeline tests --- tests/test_asyncio/test_cwe_404.py | 1 + 1 file changed, 1 insertion(+) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index 013c6d551a..d89b0d5b5d 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -138,6 +138,7 @@ async def op(r): assert await r.get("foo") == b"foo" +@pytest.mark.xfail(reason="cancel does not cause disconnect") @pytest.mark.onlynoncluster @pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) async def test_standalone_pipeline(delay, redis_addr): From 126e5fddd39124fc70221e0d2d69897e0dacb5d0 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Wed, 3 May 2023 13:54:05 +0000 Subject: [PATCH 09/10] lint --- tests/test_asyncio/test_cwe_404.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index d89b0d5b5d..90e691467a 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -172,8 +172,8 @@ async def op(pipe): with pytest.raises(asyncio.CancelledError): await t - # we have now cancelled the pieline in the middle of a request, make sure - # that the connection is still usable + # we have now cancelled the pieline in the middle of a request, + # make sure that the connection is still usable pipe.get("bar") pipe.ping() pipe.get("foo") From 849aa7739ee1c1c6f15d86b157790601a199476b Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristj=C3=A1n=20Valur=20J=C3=B3nsson?= Date: Thu, 4 May 2023 12:34:51 +0000 Subject: [PATCH 10/10] Use a common "master_host" test fixture --- tests/conftest.py | 2 +- tests/test_asyncio/conftest.py | 8 ------ tests/test_asyncio/test_cluster.py | 20 +++---------- tests/test_asyncio/test_connection_pool.py | 10 +++---- tests/test_asyncio/test_cwe_404.py | 33 ++++++++-------------- tests/test_asyncio/test_sentinel.py | 2 +- tests/test_cluster.py | 21 +++----------- 7 files changed, 27 insertions(+), 69 deletions(-) diff --git a/tests/conftest.py b/tests/conftest.py index 27dcc741a7..4cd4c3c160 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -441,7 +441,7 @@ def mock_cluster_resp_slaves(request, **kwargs): def master_host(request): url = request.config.getoption("--redis-url") parts = urlparse(url) - yield parts.hostname, parts.port + return parts.hostname, (parts.port or 6379) @pytest.fixture() diff --git a/tests/test_asyncio/conftest.py b/tests/test_asyncio/conftest.py index 6982cc840a..121a13b41b 100644 --- a/tests/test_asyncio/conftest.py +++ b/tests/test_asyncio/conftest.py @@ -1,7 +1,6 @@ import random from contextlib import asynccontextmanager as _asynccontextmanager from typing import Union -from urllib.parse import urlparse import pytest import pytest_asyncio @@ -209,13 +208,6 @@ async def mock_cluster_resp_slaves(create_redis, **kwargs): return _gen_cluster_mock_resp(r, response) -@pytest_asyncio.fixture(scope="session") -def master_host(request): - url = request.config.getoption("--redis-url") - parts = urlparse(url) - return parts.hostname - - async def wait_for_command( client: redis.Redis, monitor: Monitor, command: str, key: Union[str, None] = None ): diff --git a/tests/test_asyncio/test_cluster.py b/tests/test_asyncio/test_cluster.py index 6d0aba73fb..2d6099f6a9 100644 --- a/tests/test_asyncio/test_cluster.py +++ b/tests/test_asyncio/test_cluster.py @@ -102,18 +102,6 @@ async def pipe( await writer.drain() -@pytest.fixture -def redis_addr(request): - redis_url = request.config.getoption("--redis-url") - scheme, netloc = urlparse(redis_url)[:2] - assert scheme == "redis" - if ":" in netloc: - host, port = netloc.split(":") - return host, int(port) - else: - return netloc, 6379 - - @pytest_asyncio.fixture() async def slowlog(r: RedisCluster) -> None: """ @@ -874,7 +862,7 @@ async def test_default_node_is_replaced_after_exception(self, r): # Rollback to the old default node r.replace_default_node(curr_default_node) - async def test_address_remap(self, create_redis, redis_addr): + async def test_address_remap(self, create_redis, master_host): """Test that we can create a rediscluster object with a host-port remapper and map connections through proxy objects """ @@ -882,7 +870,8 @@ async def test_address_remap(self, create_redis, redis_addr): # we remap the first n nodes offset = 1000 n = 6 - ports = [redis_addr[1] + i for i in range(n)] + hostname, master_port = master_host + ports = [master_port + i for i in range(n)] def address_remap(address): # remap first three nodes to our local proxy @@ -895,8 +884,7 @@ def address_remap(address): # create the proxies proxies = [ - NodeProxy(("127.0.0.1", port + offset), (redis_addr[0], port)) - for port in ports + NodeProxy(("127.0.0.1", port + offset), (hostname, port)) for port in ports ] await asyncio.gather(*[p.start() for p in proxies]) try: diff --git a/tests/test_asyncio/test_connection_pool.py b/tests/test_asyncio/test_connection_pool.py index d1e52bd2a3..92499e2c4a 100644 --- a/tests/test_asyncio/test_connection_pool.py +++ b/tests/test_asyncio/test_connection_pool.py @@ -136,14 +136,14 @@ async def test_connection_creation(self): assert connection.kwargs == connection_kwargs async def test_multiple_connections(self, master_host): - connection_kwargs = {"host": master_host} + connection_kwargs = {"host": master_host[0]} async with self.get_pool(connection_kwargs=connection_kwargs) as pool: c1 = await pool.get_connection("_") c2 = await pool.get_connection("_") assert c1 != c2 async def test_max_connections(self, master_host): - connection_kwargs = {"host": master_host} + connection_kwargs = {"host": master_host[0]} async with self.get_pool( max_connections=2, connection_kwargs=connection_kwargs ) as pool: @@ -153,7 +153,7 @@ async def test_max_connections(self, master_host): await pool.get_connection("_") async def test_reuse_previously_released_connection(self, master_host): - connection_kwargs = {"host": master_host} + connection_kwargs = {"host": master_host[0]} async with self.get_pool(connection_kwargs=connection_kwargs) as pool: c1 = await pool.get_connection("_") await pool.release(c1) @@ -237,7 +237,7 @@ async def test_multiple_connections(self, master_host): async def test_connection_pool_blocks_until_timeout(self, master_host): """When out of connections, block for timeout seconds, then raise""" - connection_kwargs = {"host": master_host} + connection_kwargs = {"host": master_host[0]} async with self.get_pool( max_connections=1, timeout=0.1, connection_kwargs=connection_kwargs ) as pool: @@ -270,7 +270,7 @@ async def target(): assert asyncio.get_running_loop().time() - start >= 0.1 async def test_reuse_previously_released_connection(self, master_host): - connection_kwargs = {"host": master_host} + connection_kwargs = {"host": master_host[0]} async with self.get_pool(connection_kwargs=connection_kwargs) as pool: c1 = await pool.get_connection("_") await pool.release(c1) diff --git a/tests/test_asyncio/test_cwe_404.py b/tests/test_asyncio/test_cwe_404.py index 90e691467a..d3a0666262 100644 --- a/tests/test_asyncio/test_cwe_404.py +++ b/tests/test_asyncio/test_cwe_404.py @@ -1,6 +1,5 @@ import asyncio import contextlib -import urllib.parse import pytest @@ -9,23 +8,14 @@ from redis.asyncio.connection import async_timeout -@pytest.fixture -def redis_addr(request): - redis_url = request.config.getoption("--redis-url") - scheme, netloc = urllib.parse.urlparse(redis_url)[:2] - assert scheme == "redis" - if ":" in netloc: - return netloc.split(":") - else: - return netloc, "6379" - - class DelayProxy: def __init__(self, addr, redis_addr, delay: float = 0.0): self.addr = addr self.redis_addr = redis_addr self.delay = delay self.send_event = asyncio.Event() + self.server = None + self.task = None async def __aenter__(self): await self.start() @@ -42,7 +32,7 @@ async def start(self): self.server = await asyncio.start_server( self.handle, *self.addr, reuse_address=True ) - self.ROUTINE = asyncio.create_task(self.server.serve_forever()) + self.task = asyncio.create_task(self.server.serve_forever()) @contextlib.contextmanager def set_delay(self, delay: float = 0.0): @@ -71,9 +61,9 @@ async def handle(self, reader, writer): async def stop(self): # clean up enough so that we can reuse the looper - self.ROUTINE.cancel() + self.task.cancel() try: - await self.ROUTINE + await self.task except asyncio.CancelledError: pass loop = self.server.get_loop() @@ -100,11 +90,11 @@ async def pipe( @pytest.mark.onlynoncluster @pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) -async def test_standalone(delay, redis_addr): +async def test_standalone(delay, master_host): # create a tcp socket proxy that relays data to Redis and back, # inserting 0.1 seconds of delay - async with DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr) as dp: + async with DelayProxy(addr=("127.0.0.1", 5380), redis_addr=master_host) as dp: for b in [True, False]: # note that we connect to proxy, rather than to Redis directly @@ -141,8 +131,8 @@ async def op(r): @pytest.mark.xfail(reason="cancel does not cause disconnect") @pytest.mark.onlynoncluster @pytest.mark.parametrize("delay", argvalues=[0.05, 0.5, 1, 2]) -async def test_standalone_pipeline(delay, redis_addr): - async with DelayProxy(addr=("127.0.0.1", 5380), redis_addr=redis_addr) as dp: +async def test_standalone_pipeline(delay, master_host): + async with DelayProxy(addr=("127.0.0.1", 5380), redis_addr=master_host) as dp: for b in [True, False]: async with Redis( host="127.0.0.1", port=5380, single_connection_client=b @@ -191,12 +181,13 @@ async def op(pipe): @pytest.mark.onlycluster -async def test_cluster(request, redis_addr): +async def test_cluster(master_host): delay = 0.1 cluster_port = 6372 remap_base = 7372 n_nodes = 6 + hostname, _ = master_host def remap(address): host, port = address @@ -206,7 +197,7 @@ def remap(address): for i in range(n_nodes): port = cluster_port + i remapped = remap_base + i - forward_addr = redis_addr[0], port + forward_addr = hostname, port proxy = DelayProxy(addr=("127.0.0.1", remapped), redis_addr=forward_addr) proxies.append(proxy) diff --git a/tests/test_asyncio/test_sentinel.py b/tests/test_asyncio/test_sentinel.py index 5a0533ba05..7866056374 100644 --- a/tests/test_asyncio/test_sentinel.py +++ b/tests/test_asyncio/test_sentinel.py @@ -15,7 +15,7 @@ @pytest_asyncio.fixture(scope="module") def master_ip(master_host): - yield socket.gethostbyname(master_host) + yield socket.gethostbyname(master_host[0]) class SentinelTestClient: diff --git a/tests/test_cluster.py b/tests/test_cluster.py index 1f037c9edf..8371cc577f 100644 --- a/tests/test_cluster.py +++ b/tests/test_cluster.py @@ -8,7 +8,6 @@ from queue import LifoQueue, Queue from time import sleep from unittest.mock import DEFAULT, Mock, call, patch -from urllib.parse import urlparse import pytest @@ -125,18 +124,6 @@ def close(self): self.server.shutdown() -@pytest.fixture -def redis_addr(request): - redis_url = request.config.getoption("--redis-url") - scheme, netloc = urlparse(redis_url)[:2] - assert scheme == "redis" - if ":" in netloc: - host, port = netloc.split(":") - return host, int(port) - else: - return netloc, 6379 - - @pytest.fixture() def slowlog(request, r): """ @@ -907,7 +894,7 @@ def raise_connection_error(): assert "myself" not in nodes.get(curr_default_node.name).get("flags") assert r.get_default_node() != curr_default_node - def test_address_remap(self, request, redis_addr): + def test_address_remap(self, request, master_host): """Test that we can create a rediscluster object with a host-port remapper and map connections through proxy objects """ @@ -915,7 +902,8 @@ def test_address_remap(self, request, redis_addr): # we remap the first n nodes offset = 1000 n = 6 - ports = [redis_addr[1] + i for i in range(n)] + hostname, master_port = master_host + ports = [master_port + i for i in range(n)] def address_remap(address): # remap first three nodes to our local proxy @@ -928,8 +916,7 @@ def address_remap(address): # create the proxies proxies = [ - NodeProxy(("127.0.0.1", port + offset), (redis_addr[0], port)) - for port in ports + NodeProxy(("127.0.0.1", port + offset), (hostname, port)) for port in ports ] for p in proxies: p.start()