Skip to content

Commit 0d39c19

Browse files
authored
Clean up legacy cruft from worker reconnection (#7712)
1 parent 96b6958 commit 0d39c19

File tree

4 files changed

+14
-53
lines changed

4 files changed

+14
-53
lines changed

distributed/scheduler.py

Lines changed: 1 addition & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -4107,12 +4107,9 @@ async def add_worker(
41074107
address: str,
41084108
status: str,
41094109
server_id: str,
4110-
keys=(),
41114110
nthreads=None,
41124111
name=None,
41134112
resolve_address=True,
4114-
nbytes=None,
4115-
types=None,
41164113
now=None,
41174114
resources=None,
41184115
host_info=None,
@@ -4134,15 +4131,6 @@ async def add_worker(
41344131
if address in self.workers:
41354132
raise ValueError("Worker already exists %s" % address)
41364133

4137-
if nbytes:
4138-
err = (
4139-
f"Worker {address!r} connected with {len(nbytes)} key(s) in memory! Worker reconnection is not supported. "
4140-
f"Keys: {list(nbytes)}"
4141-
)
4142-
logger.error(err)
4143-
await comm.write({"status": "error", "message": err, "time": time()})
4144-
return
4145-
41464134
if name in self.aliases:
41474135
logger.warning("Worker tried to connect with a duplicate name: %s", name)
41484136
msg = {
@@ -4202,9 +4190,6 @@ async def add_worker(
42024190
# exist before this.
42034191
self.check_idle_saturated(ws)
42044192

4205-
# for key in keys: # TODO
4206-
# self.mark_key_in_memory(key, [address])
4207-
42084193
self.stream_comms[address] = BatchedSend(interval="5ms", loop=self.loop)
42094194

42104195
for plugin in list(self.plugins.values()):
@@ -6982,13 +6967,7 @@ def update_data(
69826967
nbytes: dict,
69836968
client=None,
69846969
):
6985-
"""
6986-
Learn that new data has entered the network from an external source
6987-
6988-
See Also
6989-
--------
6990-
Scheduler.mark_key_in_memory
6991-
"""
6970+
"""Learn that new data has entered the network from an external source"""
69926971
who_has = {k: [self.coerce_address(vv) for vv in v] for k, v in who_has.items()}
69936972
logger.debug("Update data %s", who_has)
69946973

distributed/tests/test_scheduler.py

Lines changed: 5 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1676,30 +1676,14 @@ def add_blocked(x, y, event):
16761676
await wait(z)
16771677

16781678

1679-
@gen_cluster(nthreads=[])
1680-
async def test_new_worker_with_data_rejected(s):
1681-
w = Worker(s.address, nthreads=1)
1682-
w.update_data(data={"x": 0})
1683-
assert w.state.tasks["x"].state == "memory"
1684-
assert w.data == {"x": 0}
1685-
1686-
with captured_logger(
1687-
"distributed.worker", level=logging.WARNING
1688-
) as wlog, captured_logger("distributed.scheduler", level=logging.WARNING) as slog:
1689-
with pytest.raises(RuntimeError, match="Worker failed to start"):
1690-
await w
1691-
assert "connected with 1 key(s) in memory" in slog.getvalue()
1692-
assert "Register worker" not in slog.getvalue()
1693-
assert "connected with 1 key(s) in memory" in wlog.getvalue()
1694-
1695-
assert w.status == Status.failed
1696-
assert not s.workers
1697-
assert not s.stream_comms
1698-
assert not s.host_info
1679+
@gen_test()
1680+
async def test_nonempty_data_is_rejected():
1681+
with pytest.raises(ValueError, match="Worker.data must be empty"):
1682+
await Worker("localhost:12345", nthreads=1, data={"x": 1})
16991683

17001684

17011685
@gen_cluster(client=True)
1702-
async def test_worker_arrives_with_processing_data(c, s, a, b):
1686+
async def test_worker_arrives_with_data_is_rejected(c, s, a, b):
17031687
# A worker arriving with data we need should still be rejected,
17041688
# and not affect other computations
17051689
x = delayed(slowinc)(1, delay=0.4)

distributed/worker.py

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1157,6 +1157,11 @@ async def _register_with_scheduler(self) -> None:
11571157
if self.contact_address is None:
11581158
self.contact_address = self.address
11591159
logger.info("-" * 49)
1160+
1161+
# Worker reconnection is not supported
1162+
assert not self.data
1163+
assert not self.state.tasks
1164+
11601165
while True:
11611166
try:
11621167
_start = time()
@@ -1169,18 +1174,8 @@ async def _register_with_scheduler(self) -> None:
11691174
reply=False,
11701175
address=self.contact_address,
11711176
status=self.status.name,
1172-
keys=list(self.data),
11731177
nthreads=self.state.nthreads,
11741178
name=self.name,
1175-
nbytes={
1176-
ts.key: ts.get_nbytes()
1177-
for ts in self.state.tasks.values()
1178-
# Only if the task is in memory this is a sensible
1179-
# result since otherwise it simply submits the
1180-
# default value
1181-
if ts.state == "memory"
1182-
},
1183-
types={k: typename(v) for k, v in self.data.items()},
11841179
now=time(),
11851180
resources=self.state.total_resources,
11861181
memory_limit=self.memory_manager.memory_limit,

distributed/worker_memory.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -164,6 +164,9 @@ def __init__(
164164
else:
165165
self.data = {}
166166

167+
if self.data:
168+
raise ValueError("Worker.data must be empty at initialization time")
169+
167170
self.memory_monitor_interval = parse_timedelta(
168171
dask.config.get("distributed.worker.memory.monitor-interval"),
169172
default=False,

0 commit comments

Comments
 (0)