Skip to content

Nanny assumes self.scheduler is Scheduler, hangs when used in async with #5627

@eddiebergman

Description

@eddiebergman

CC: @mfeurer

What happened:
Using a Nanny worker with async with context manager, inside its own process, failed to connect to a LocalCluster's scheduler.

What you expected to happen:
For the Nanny worker to have connected to the LocalCluster

Reason
This is due to a change here which occured in dask 2021.07.0. Most of the context for this change is found in that change.

  • L333 await self.scheduler.register_nanny() - Seems like it's expecting a Scheduler object, specifically this.
  • L244 However, where this is set, self.scheduler = self.rpc(self.scheduler_addr) this returns an instance of PooledRPCCall
  • There is nothing I could find in that class hierarchy that would change it to a Scheduler

Context
When upgrading from distributed 2021.06.0 to distributed 2021.12.0, one of our examples for auto-sklearn fails.

Minimal Complete Verifiable Example:
Apologies, it could probably be made more minimal but I'm not the most familiar with dask or Python's async.

import asyncio
import multiprocessing
import time

import dask
from dask.distributed import Client, LocalCluster, Nanny

def start_python_worker(ip, port):
    dask.config.set({'distributed.worker.daemon': False})

    async def do_work():
        async with dask.distributed.Nanny(
            scheduler_ip=ip,
            scheduler_port=port,
            nthreads=1,
        ) as worker:  # <--- Hangs here
            print("I won't print")
            await worker.finished()

    # asyncio.run(do_work())
    asyncio.get_event_loop().run_until_complete(do_work())

if __name__ == "__main__":

    with LocalCluster(
        n_workers=0,
        processes=True,
        threads_per_worker=1,
    ) as cluster:
        scheduler = cluster.scheduler

        process = multiprocessing.Process(
            target=start_python_worker,
            args=(scheduler.ip, scheduler.port)
        )
        process.start()

        time.sleep(2) # Buffer for process to spin up

        with dask.distributed.Client(cluster) as client:
            future = client.submit(lambda x: x * 2, 10)

            # Should be async but here we are, looping
            while not future.done():
                time.sleep(1)

            result = future.result()
            print(result)

    process.join()

Environment:

  • Dask version: 2021.12.0
  • Python version: 3.8.5
  • Operating System: Manjaro Linux
  • Install method (conda, pip, source): pip

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions