|
16 | 16 | get_client,
|
17 | 17 | wait,
|
18 | 18 | )
|
| 19 | +from distributed.actor import _LateLoopEvent |
19 | 20 | from distributed.metrics import time
|
20 | 21 | from distributed.utils_test import cluster, gen_cluster
|
21 | 22 |
|
@@ -256,6 +257,27 @@ def test_sync(client):
|
256 | 257 | assert "distributed.actor" not in repr(future)
|
257 | 258 |
|
258 | 259 |
|
| 260 | +def test_timeout(client): |
| 261 | + class Waiter: |
| 262 | + def __init__(self): |
| 263 | + self.event = _LateLoopEvent() |
| 264 | + |
| 265 | + async def set(self): |
| 266 | + self.event.set() |
| 267 | + |
| 268 | + async def wait(self): |
| 269 | + return await self.event.wait() |
| 270 | + |
| 271 | + event = client.submit(Waiter, actor=True).result() |
| 272 | + future = event.wait() |
| 273 | + |
| 274 | + with pytest.raises(asyncio.TimeoutError): |
| 275 | + future.result(timeout="0.001s") |
| 276 | + |
| 277 | + event.set().result() |
| 278 | + assert future.result() is True |
| 279 | + |
| 280 | + |
259 | 281 | @gen_cluster(client=True, config={"distributed.comm.timeouts.connect": "1s"})
|
260 | 282 | async def test_failed_worker(c, s, a, b):
|
261 | 283 | future = c.submit(Counter, actor=True, workers=[a.address])
|
@@ -528,11 +550,9 @@ def sleep(self, time):
|
528 | 550 |
|
529 | 551 | @gen_cluster(client=True)
|
530 | 552 | async def test_waiter(c, s, a, b):
|
531 |
| - from tornado.locks import Event |
532 |
| - |
533 | 553 | class Waiter:
|
534 | 554 | def __init__(self):
|
535 |
| - self.event = Event() |
| 555 | + self.event = _LateLoopEvent() |
536 | 556 |
|
537 | 557 | async def set(self):
|
538 | 558 | self.event.set()
|
|
0 commit comments