1
1
import asyncio
2
2
import contextlib
3
- import urllib .parse
4
3
5
4
import pytest
6
5
9
8
from redis .asyncio .connection import async_timeout
10
9
11
10
12
- @pytest .fixture
13
- def redis_addr (request ):
14
- redis_url = request .config .getoption ("--redis-url" )
15
- scheme , netloc = urllib .parse .urlparse (redis_url )[:2 ]
16
- assert scheme == "redis"
17
- if ":" in netloc :
18
- return netloc .split (":" )
19
- else :
20
- return netloc , "6379"
21
-
22
-
23
11
class DelayProxy :
24
12
def __init__ (self , addr , redis_addr , delay : float = 0.0 ):
25
13
self .addr = addr
26
14
self .redis_addr = redis_addr
27
15
self .delay = delay
28
16
self .send_event = asyncio .Event ()
17
+ self .server = None
18
+ self .task = None
29
19
30
20
async def __aenter__ (self ):
31
21
await self .start ()
@@ -42,7 +32,7 @@ async def start(self):
42
32
self .server = await asyncio .start_server (
43
33
self .handle , * self .addr , reuse_address = True
44
34
)
45
- self .ROUTINE = asyncio .create_task (self .server .serve_forever ())
35
+ self .task = asyncio .create_task (self .server .serve_forever ())
46
36
47
37
@contextlib .contextmanager
48
38
def set_delay (self , delay : float = 0.0 ):
@@ -71,9 +61,9 @@ async def handle(self, reader, writer):
71
61
72
62
async def stop (self ):
73
63
# clean up enough so that we can reuse the looper
74
- self .ROUTINE .cancel ()
64
+ self .task .cancel ()
75
65
try :
76
- await self .ROUTINE
66
+ await self .task
77
67
except asyncio .CancelledError :
78
68
pass
79
69
loop = self .server .get_loop ()
@@ -100,11 +90,11 @@ async def pipe(
100
90
101
91
@pytest .mark .onlynoncluster
102
92
@pytest .mark .parametrize ("delay" , argvalues = [0.05 , 0.5 , 1 , 2 ])
103
- async def test_standalone (delay , redis_addr ):
93
+ async def test_standalone (delay , master_host ):
104
94
105
95
# create a tcp socket proxy that relays data to Redis and back,
106
96
# inserting 0.1 seconds of delay
107
- async with DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = redis_addr ) as dp :
97
+ async with DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = master_host ) as dp :
108
98
109
99
for b in [True , False ]:
110
100
# note that we connect to proxy, rather than to Redis directly
@@ -141,8 +131,8 @@ async def op(r):
141
131
@pytest .mark .xfail (reason = "cancel does not cause disconnect" )
142
132
@pytest .mark .onlynoncluster
143
133
@pytest .mark .parametrize ("delay" , argvalues = [0.05 , 0.5 , 1 , 2 ])
144
- async def test_standalone_pipeline (delay , redis_addr ):
145
- async with DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = redis_addr ) as dp :
134
+ async def test_standalone_pipeline (delay , master_host ):
135
+ async with DelayProxy (addr = ("127.0.0.1" , 5380 ), redis_addr = master_host ) as dp :
146
136
for b in [True , False ]:
147
137
async with Redis (
148
138
host = "127.0.0.1" , port = 5380 , single_connection_client = b
@@ -191,12 +181,13 @@ async def op(pipe):
191
181
192
182
193
183
@pytest .mark .onlycluster
194
- async def test_cluster (request , redis_addr ):
184
+ async def test_cluster (master_host ):
195
185
196
186
delay = 0.1
197
187
cluster_port = 6372
198
188
remap_base = 7372
199
189
n_nodes = 6
190
+ hostname , _ = master_host
200
191
201
192
def remap (address ):
202
193
host , port = address
@@ -206,7 +197,7 @@ def remap(address):
206
197
for i in range (n_nodes ):
207
198
port = cluster_port + i
208
199
remapped = remap_base + i
209
- forward_addr = redis_addr [ 0 ] , port
200
+ forward_addr = hostname , port
210
201
proxy = DelayProxy (addr = ("127.0.0.1" , remapped ), redis_addr = forward_addr )
211
202
proxies .append (proxy )
212
203
0 commit comments