Skip to content

Commit 12835e6

Browse files
committed
Ensure adaptive properties work as expected for SpecCluster
1 parent 76dd800 commit 12835e6

File tree

2 files changed

+55
-2
lines changed

2 files changed

+55
-2
lines changed

distributed/deploy/spec.py

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -380,8 +380,8 @@ async def _correct_state_internal(self) -> None:
380380
workers.append(worker)
381381
if workers:
382382
worker_futs = [asyncio.ensure_future(w) for w in workers]
383-
await asyncio.wait(worker_futs)
384383
self.workers.update(dict(zip(to_open, workers)))
384+
await asyncio.wait(worker_futs)
385385
for w in workers:
386386
w._cluster = weakref.ref(self)
387387
# Collect exceptions from failed workers. This must happen after all

distributed/deploy/tests/test_local.py

Lines changed: 54 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
from distributed.core import Status
2020
from distributed.metrics import time
2121
from distributed.system import MEMORY_LIMIT
22-
from distributed.utils import TimeoutError, open_port, sync
22+
from distributed.utils import Deadline, TimeoutError, open_port, sync
2323
from distributed.utils_test import (
2424
assert_can_connect_from_everywhere_4,
2525
assert_can_connect_from_everywhere_4_6,
@@ -1255,6 +1255,59 @@ def setup(self, worker=None):
12551255
import my_nonexistent_library # noqa
12561256

12571257

1258+
class SlowPlugin:
1259+
def __init__(self, delay=0.1):
1260+
self.delay = delay
1261+
1262+
def setup(self, worker=None):
1263+
sleep(self.delay)
1264+
1265+
1266+
@pytest.mark.slow()
1267+
def test_localcluster_plan_requested_observed():
1268+
with LocalCluster(
1269+
n_workers=0,
1270+
threads_per_worker=1,
1271+
processes=True,
1272+
# FIXME: Ideally this would work with an IPC Event or a file to
1273+
# synchronize instead of sleeping
1274+
plugins={SlowPlugin(delay=2)},
1275+
dashboard_address=":0",
1276+
) as cluster:
1277+
assert len(cluster.plan) == 0
1278+
assert len(cluster.requested) == 0
1279+
assert len(cluster.observed) == 0
1280+
1281+
cluster.scale(1)
1282+
assert len(cluster.plan) == 1
1283+
assert len(cluster.requested) == 0
1284+
assert len(cluster.observed) == 0
1285+
1286+
# This should pretty much trigger once we had the chance to run an event
1287+
# loop tick
1288+
dl = Deadline.after(1)
1289+
while not cluster.requested and dl.remaining:
1290+
sleep(0.01)
1291+
1292+
# The worker is requested. For the LocalCluster this means that the
1293+
# process is up but for generic SpecCluster implementation this merely
1294+
# means that an additional worker has been asked for but it is not yet
1295+
# up and running
1296+
assert not cluster.scheduler_info["workers"]
1297+
assert len(cluster.plan) == 1
1298+
assert len(cluster.requested) == 1
1299+
assert len(cluster.observed) == 0
1300+
1301+
with Client(cluster) as client:
1302+
client.wait_for_workers(1)
1303+
1304+
# The worker is fully functional and registered to the scheduler
1305+
assert cluster.scheduler_info["workers"]
1306+
assert len(cluster.requested) == 1
1307+
assert len(cluster.plan) == 1
1308+
assert len(cluster.observed) == 1
1309+
1310+
12581311
@pytest.mark.slow
12591312
def test_localcluster_start_exception(loop):
12601313
with raises_with_cause(

0 commit comments

Comments
 (0)