Skip to content

Avoid deadlock when two tasks are concurrently waiting for an unresolved ActorFuture #5709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion distributed/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
from dask.config import config # type: ignore

from ._version import get_versions
from .actor import Actor, ActorFuture
from .actor import Actor, BaseActorFuture
from .client import (
Client,
CompatibleExecutor,
Expand Down
158 changes: 119 additions & 39 deletions distributed/actor.py
Original file line number Diff line number Diff line change
@@ -1,20 +1,63 @@
from __future__ import annotations

import abc
import asyncio
import functools
import sys
import threading
from dataclasses import dataclass
from datetime import timedelta
from typing import Generic, Literal, NoReturn, TypeVar

from tornado.ioloop import IOLoop

from .client import Future
from .protocol import to_serialize
from .utils import iscoroutinefunction, sync, thread_state
from .utils_comm import WrappedKey
from .worker import get_client, get_worker

_T = TypeVar("_T")

if sys.version_info >= (3, 9):
from collections.abc import Awaitable, Generator
else:
from typing import Awaitable, Generator

if sys.version_info >= (3, 10):
from asyncio import Event as _LateLoopEvent
else:
# In python 3.10 asyncio.Lock and other primitives no longer support
# passing a loop kwarg to bind to a loop running in another thread
# e.g. calling from Client(asynchronous=False). Instead the loop is bound
# as late as possible: when calling any methods that wait on or wake
# Future instances. See: https://bugs.python.org/issue42392
class _LateLoopEvent:
def __init__(self) -> None:
self._event: asyncio.Event | None = None

def set(self) -> None:
if self._event is None:
self._event = asyncio.Event()

self._event.set()

def is_set(self) -> bool:
return self._event is not None and self._event.is_set()

async def wait(self) -> bool:
if self._event is None:
self._event = asyncio.Event()

return await self._event.wait()


class Actor(WrappedKey):
"""Controls an object on a remote worker

An actor allows remote control of a stateful object living on a remote
worker. Method calls on this object trigger operations on the remote
object and return ActorFutures on which we can block to get results.
object and return BaseActorFutures on which we can block to get results.

Examples
--------
Expand All @@ -36,7 +79,7 @@ class Actor(WrappedKey):
>>> counter
<Actor: Counter, key=Counter-1234abcd>

Calling methods on this object immediately returns deferred ``ActorFuture``
Calling methods on this object immediately returns deferred ``BaseActorFuture``
objects. You can call ``.result()`` on these objects to block and get the
result of the function call.

Expand Down Expand Up @@ -140,9 +183,7 @@ def __getattr__(self, key):
return attr

elif callable(attr):
return lambda *args, **kwargs: ActorFuture(
None, self._io_loop, result=attr(*args, **kwargs)
)
return lambda *args, **kwargs: EagerActorFuture(attr(*args, **kwargs))
else:
return attr

Expand All @@ -166,16 +207,17 @@ async def run_actor_function_on_worker():
return await run_actor_function_on_worker()
else: # pragma: no cover
raise OSError("Unable to contact Actor's worker")
return result
if result["status"] == "OK":
return _OK(result["result"])
return _Error(result["exception"])

q = asyncio.Queue(loop=self._io_loop.asyncio_loop)
actor_future = ActorFuture(io_loop=self._io_loop)

async def wait_then_add_to_queue():
x = await run_actor_function_on_worker()
await q.put(x)
async def wait_then_set_result():
actor_future._set_result(await run_actor_function_on_worker())

self._io_loop.add_callback(wait_then_add_to_queue)
return ActorFuture(q, self._io_loop)
self._io_loop.add_callback(wait_then_set_result)
return actor_future

return func

Expand Down Expand Up @@ -215,10 +257,10 @@ async def func(**msg):
return func


class ActorFuture:
class BaseActorFuture(abc.ABC, Awaitable[_T]):
"""Future to an actor's method call

Whenever you call a method on an Actor you get an ActorFuture immediately
Whenever you call a method on an Actor you get a BaseActorFuture immediately
while the computation happens in the background. You can call ``.result``
to block and collect the full result

Expand All @@ -227,34 +269,72 @@ class ActorFuture:
Actor
"""

def __init__(self, q, io_loop, result=None):
self.q = q
self.io_loop = io_loop
if result:
self._cached_result = result
self.status = "pending"
@abc.abstractmethod
def result(self, timeout: str | timedelta | float | None = None) -> _T:
...

@abc.abstractmethod
def done(self) -> bool:
...

def __repr__(self) -> Literal["<ActorFuture>"]:
return "<ActorFuture>"


@dataclass(frozen=True, eq=False)
class EagerActorFuture(BaseActorFuture[_T]):
"""Future to an actor's method call when an actor calls another actor on the same worker"""

def __await__(self):
_result: _T

def __await__(self) -> Generator[object, None, _T]:
return self._result
yield

def result(self, timeout: object = None) -> _T:
return self._result

def done(self) -> Literal[True]:
return True


@dataclass(frozen=True, eq=False)
class _OK(Generic[_T]):
_v: _T

def unwrap(self) -> _T:
return self._v


@dataclass(frozen=True, eq=False)
class _Error:
_e: Exception

def unwrap(self) -> NoReturn:
raise self._e


class ActorFuture(BaseActorFuture[_T]):
def __init__(self, io_loop: IOLoop):
self._io_loop = io_loop
self._event = _LateLoopEvent()
self._out: _Error | _OK[_T] | None = None

def __await__(self) -> Generator[object, None, _T]:
return self._result().__await__()

def done(self):
return self.status != "pending"
def done(self) -> bool:
return self._event.is_set()

async def _result(self, raiseit=True):
if not hasattr(self, "_cached_result"):
out = await self.q.get()
if out["status"] == "OK":
self.status = "finished"
self._cached_result = out["result"]
else:
self.status = "error"
self._cached_result = out["exception"]
if self.status == "error":
raise self._cached_result
return self._cached_result
async def _result(self) -> _T:
await self._event.wait()
out = self._out
assert out is not None
return out.unwrap()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elucidate a bit more the purpose of these wrapper classes, as opposed to the more direct inspection of the result that there was previously? It seems like they are related to trying to get a chain of custody for the generic _T, but I'm not sure it really buys much since the setting of the result here isn't checked, so we ultimately have an _OK(Unknown).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ideally this would have been done with TypedDict but they're not supported python/mypy#3863

class _OK(TypedDict, Generic[_T]):
    status: Literal["OK"]
    result: _T

class _Error(TypedDict):
    status: Literal["error"]
    exception: Exception
...
    def _set_result(self, out: _Error | _OK[_T]): ...

It seems like they are related to trying to get a chain of custody for the generic _T, but I'm not sure it really buys much since the setting of the result here isn't checked, so we ultimately have an _OK(Unknown).

I needed to draw the line somewhere of what's typed and what's not in this PR, and chose to type all the methods and classes of BaseActorFuture. _Error | _OK[_T] | None is also needed for the internal state ActorFuture so I think it's worth it for now


def result(self, timeout=None):
return sync(self.io_loop, self._result, callback_timeout=timeout)
def _set_result(self, out: _Error | _OK[_T]) -> None:
self._out = out
self._event.set()

def __repr__(self):
return "<ActorFuture>"
def result(self, timeout: str | timedelta | float | None = None) -> _T:
return sync(self._io_loop, self._result, callback_timeout=timeout)
4 changes: 2 additions & 2 deletions distributed/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -4966,11 +4966,11 @@ def update(self, futures):
"""Add multiple futures to the collection.

The added futures will emit from the iterator once they finish"""
from .actor import ActorFuture
from .actor import BaseActorFuture

with self.lock:
for f in futures:
if not isinstance(f, (Future, ActorFuture)):
if not isinstance(f, (Future, BaseActorFuture)):
raise TypeError("Input must be a future, got %s" % f)
self.futures[f] += 1
self.loop.add_callback(self._track_future, f)
Expand Down
Loading