Skip to content

Avoid deadlock when two tasks are concurrently waiting for an unresolved ActorFuture #5709

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 12 commits into from
Feb 18, 2022

Conversation

graingert
Copy link
Member

@graingert graingert commented Jan 26, 2022

@GPUtester
Copy link
Collaborator

Can one of the admins verify this patch?

@graingert graingert changed the title avoid deadlock in ActorFuture deadlock when two tasks are concurrently waiting for an unresolved ActorFuture Jan 26, 2022
@graingert graingert changed the title deadlock when two tasks are concurrently waiting for an unresolved ActorFuture avoid deadlock when two tasks are concurrently waiting for an unresolved ActorFuture Jan 26, 2022
@graingert graingert force-pushed the actor-future-deadlock branch 2 times, most recently from 43bb3d2 to 95f11e9 Compare January 27, 2022 13:21
@graingert graingert marked this pull request as ready for review January 27, 2022 16:30
@graingert graingert force-pushed the actor-future-deadlock branch from f2a84c5 to cb8e7de Compare January 27, 2022 16:39
@ian-r-rose ian-r-rose self-requested a review January 27, 2022 17:48
Copy link
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @graingert, this is a nice piece of work, and the implementation seems sound to me. A few design questions, but nothing major.

@graingert graingert force-pushed the actor-future-deadlock branch 2 times, most recently from 1a09c7e to f2fbc8a Compare January 28, 2022 13:37
Copy link
Collaborator

@gjoseph92 gjoseph92 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not too important, but I could see some type annotations here being helpful, mostly just for future readers. Particularly for things like:

  • ActorFuture.result return type (making ActorFuture generic would be necessary)
  • ActorFuture.__await__ return type
  • _ActorFuture._out

@graingert graingert force-pushed the actor-future-deadlock branch 3 times, most recently from 6549929 to 6e54fd1 Compare February 3, 2022 14:54
@github-actions
Copy link
Contributor

github-actions bot commented Feb 3, 2022

Unit Test Results

       12 files  ±  0         12 suites  ±0   6h 55m 30s ⏱️ - 26m 31s
  2 607 tests +  4    2 528 ✔️ +  4       79 💤 ±  0  0 ±0 
15 566 runs  +24  14 547 ✔️ +99  1 019 💤  - 75  0 ±0 

Results for commit 2a7b3e4. ± Comparison against base commit 60d82c2.

♻️ This comment has been updated with latest results.

@mcepl
Copy link

mcepl commented Feb 8, 2022

Hmm, tried your PR with Python 3.10.1 and the results were not completely conclusive:

[ 1619s] distributed/tests/test_worker_client.py::test_secede_without_stealing_issue_1262
[ 1619s]   /usr/lib/python3.10/site-packages/_pytest/threadexception.py:75: PytestUnhandledThreadExceptionWarning: Exception in thread Profile
[ 1619s]
[ 1619s]   Traceback (most recent call last):
[ 1619s]     File "/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/profile.py", line 115, in process
[ 1619s]       d = state["children"][ident]
[ 1619s]   KeyError: 'callHandlers;/usr/lib64/python3.10/logging/__init__.py;1680'
[ 1619s]
[ 1619s]   During handling of the above exception, another exception occurred:
[ 1619s]
[ 1619s]   Traceback (most recent call last):
[ 1619s]     File "/usr/lib64/python3.10/threading.py", line 1009, in _bootstrap_inner
[ 1619s]       self.run()
[ 1619s]     File "/usr/lib64/python3.10/threading.py", line 946, in run
[ 1619s]       self._target(*self._args, **self._kwargs)
[ 1619s]     File "/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/profile.py", line 274, in _watch
[ 1619s]       process(frame, None, recent, omit=omit)
[ 1619s]     File "/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/profile.py", line 119, in process
[ 1619s]       "description": info_frame(frame),
[ 1619s]     File "/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/profile.py", line 72, in info_frame
[ 1619s]       line = linecache.getline(co.co_filename, frame.f_lineno, frame.f_globals).lstrip()
[ 1619s]     File "/usr/lib64/python3.10/linecache.py", line 31, in getline
[ 1619s]       if 1 <= lineno <= len(lines):
[ 1619s]   TypeError: '<=' not supported between instances of 'int' and 'NoneType'
[ 1619s]
[ 1619s]     warnings.warn(pytest.PytestUnhandledThreadExceptionWarning(msg))
[ 1619s]
[ 1619s] distributed/tests/test_core.py::test_server_status_is_always_enum
[ 1619s] distributed/tests/test_utils.py::test_sync_closed_loop
[ 1619s]   /usr/lib64/python3.10/site-packages/tornado/platform/asyncio.py:279: DeprecationWarning: There is no current event loop
[ 1619s]     super().initialize(asyncio.get_event_loop(), **kwargs)
[ 1619s]
[ 1619s] distributed/tests/test_core.py::test_server_listen
[ 1619s]   /home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/utils.py:135: RuntimeWarning: Couldn't detect a suitable IP address for reaching '2001:4860:4860::8888', defaulting to hostname: [Errno 101] Network is unreachable
[ 1619s]     warnings.warn(
[ 1619s]
[ 1619s] distributed/tests/test_ipython.py::test_start_ipython_workers
[ 1619s]   /usr/lib/python3.10/site-packages/pytest_asyncio/plugin.py:317: DeprecationWarning: '@pytest.fixture' is applied to <fixture zmq_ctx, file=/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/utils_test.py, line=178> in 'legacy' mode, please replace it with '@pytest_asyncio.fixture' as a preparation for switching to 'strict' mode (or use 'auto' mode to seamlessly handle all these fixtures as asyncio-driven).
[ 1619s]     warnings.warn(
[ 1619s]
[ 1619s] distributed/tests/test_ipython.py: 24 warnings
[ 1619s]   /usr/lib/python3.10/site-packages/jupyter_client/utils.py:14: DeprecationWarning: There is no current event loop
[ 1619s]     loop = asyncio.get_event_loop()
[ 1619s]
[ 1619s] distributed/tests/test_ipython.py: 24 warnings
[ 1619s]   /usr/lib/python3.10/site-packages/jupyter_client/utils.py:21: DeprecationWarning: There is no current event loop
[ 1619s]     future = asyncio.ensure_future(coro(*args, **kwargs))
[ 1619s]
[ 1619s] distributed/tests/test_ipython.py::test_start_ipython_workers
[ 1619s] distributed/tests/test_ipython.py::test_start_ipython_scheduler
[ 1619s] distributed/tests/test_ipython.py::test_start_ipython_scheduler_magic
[ 1619s] distributed/tests/test_ipython.py::test_start_ipython_workers_magic
[ 1619s] distributed/tests/test_ipython.py::test_start_ipython_workers_magic_asterix
[ 1619s] distributed/tests/test_ipython.py::test_start_ipython_remote
[ 1619s] distributed/tests/test_ipython.py::test_start_ipython_remote
[ 1619s]   /usr/lib64/python3.10/site-packages/zmq/_future.py:410: DeprecationWarning: There is no current event loop
[ 1619s]     f = future or self._Future()
[ 1619s]
[ 1619s] distributed/tests/test_preload.py::test_web_preload
[ 1619s]   /usr/lib/python3.10/site-packages/pytest_asyncio/plugin.py:317: DeprecationWarning: '@pytest.fixture' is applied to <fixture scheduler_preload, file=/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/tests/test_preload.py, line=179> in 'legacy' mode, please replace it with '@pytest_asyncio.fixture' as a preparation for switching to 'strict' mode (or use 'auto' mode to seamlessly handle all these fixtures as asyncio-driven).
[ 1619s]     warnings.warn(
[ 1619s]
[ 1619s] distributed/tests/test_preload.py::test_web_preload_worker
[ 1619s]   /usr/lib/python3.10/site-packages/pytest_asyncio/plugin.py:317: DeprecationWarning: '@pytest.fixture' is applied to <fixture worker_preload, file=/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/tests/test_preload.py, line=256> in 'legacy' mode, please replace it with '@pytest_asyncio.fixture' as a preparation for switching to 'strict' mode (or use 'auto' mode to seamlessly handle all these fixtures as asyncio-driven).
[ 1619s]     warnings.warn(
[ 1619s]
[ 1619s] distributed/tests/test_scheduler.py::test_non_idempotent_plugins
[ 1619s]   /home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/scheduler.py:5656: UserWarning: Scheduler already contains a plugin with name nonidempotentplugin; overwriting.
[ 1619s]     warnings.warn(
[ 1619s]
[ 1619s] distributed/tests/test_steal.py::test_work_stealing
[ 1619s] distributed/tests/test_stress.py::test_stress_1
[ 1619s]   /usr/lib/python3.10/site-packages/pytest_asyncio/plugin.py:317: DeprecationWarning: '@pytest.fixture' is applied to <fixture Module._inject_setup_module_fixture.<locals>.xunit_setup_module_fixture, file=/usr/lib/python3.10/site-packages/_pytest/python.py, line=525> in 'legacy' mode, please replace it with '@pytest_asyncio.fixture' as a preparation for switching to 'strict' mode (or use 'auto' mode to seamlessly handle all these fixtures as asyncio-driven).
[ 1619s]     warnings.warn(
[ 1619s]
[ 1619s] distributed/tests/test_utils_test.py::test_tls_cluster
[ 1619s]   /usr/lib/python3.10/site-packages/pytest_asyncio/plugin.py:317: DeprecationWarning: '@pytest.fixture' is applied to <fixture tls_cluster, file=/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/utils_test.py, line=602> in 'legacy' mode, please replace it with '@pytest_asyncio.fixture' as a preparation for switching to 'strict' mode (or use 'auto' mode to seamlessly handle all these fixtures as asyncio-driven).
[ 1619s]     warnings.warn(
[ 1619s]
[ 1619s] distributed/tests/test_utils_test.py::test_tls_cluster
[ 1619s]   /usr/lib/python3.10/site-packages/pytest_asyncio/plugin.py:317: DeprecationWarning: '@pytest.fixture' is applied to <fixture tls_client, file=/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/utils_test.py, line=608> in 'legacy' mode, please replace it with '@pytest_asyncio.fixture' as a preparation for switching to 'strict' mode (or use 'auto' mode to seamlessly handle all these fixtures as asyncio-driven).
[ 1619s]     warnings.warn(
[ 1619s]
[ 1619s] distributed/tests/test_utils_test.py::test_dump_cluster_state_nannies
[ 1619s]   /usr/lib/python3.10/site-packages/_pytest/threadexception.py:75: PytestUnhandledThreadExceptionWarning: Exception in thread Profile
[ 1619s]
[ 1619s]   Traceback (most recent call last):
[ 1619s]     File "/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/profile.py", line 115, in process
[ 1619s]       d = state["children"][ident]
[ 1619s]   KeyError: 'prepare_tag;/usr/lib64/python3.10/site-packages/yaml/emitter.py;580'
[ 1619s]
[ 1619s]   During handling of the above exception, another exception occurred:
[ 1619s]
[ 1619s]   Traceback (most recent call last):
[ 1619s]     File "/usr/lib64/python3.10/threading.py", line 1009, in _bootstrap_inner
[ 1619s]       self.run()
[ 1619s]     File "/usr/lib64/python3.10/threading.py", line 946, in run
[ 1619s]       self._target(*self._args, **self._kwargs)
[ 1619s]     File "/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/profile.py", line 274, in _watch
[ 1619s]       process(frame, None, recent, omit=omit)
[ 1619s]     File "/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/profile.py", line 119, in process
[ 1619s]       "description": info_frame(frame),
[ 1619s]     File "/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/profile.py", line 72, in info_frame
[ 1619s]       line = linecache.getline(co.co_filename, frame.f_lineno, frame.f_globals).lstrip()
[ 1619s]     File "/usr/lib64/python3.10/linecache.py", line 31, in getline
[ 1619s]       if 1 <= lineno <= len(lines):
[ 1619s]   TypeError: '<=' not supported between instances of 'int' and 'NoneType'
[ 1619s]
[ 1619s]     warnings.warn(pytest.PytestUnhandledThreadExceptionWarning(msg))
[ 1619s]
[ 1619s] -- Docs: https://docs.pytest.org/en/stable/warnings.html
[ 1619s] =========================== rerun test summary info ============================
[ 1619s] RERUN distributed/tests/test_client.py::test_client_gather_semaphore_loop
[ 1619s] RERUN distributed/tests/test_client.py::test_client_gather_semaphore_loop
[ 1619s] RERUN distributed/tests/test_client.py::test_client_gather_semaphore_loop
[ 1619s] RERUN distributed/tests/test_client.py::test_as_completed_condition_loop
[ 1619s] RERUN distributed/tests/test_client.py::test_as_completed_condition_loop
[ 1619s] RERUN distributed/tests/test_client.py::test_as_completed_condition_loop
[ 1619s] RERUN distributed/tests/test_client.py::test_client_connectionpool_semaphore_loop
[ 1619s] RERUN distributed/tests/test_client.py::test_client_connectionpool_semaphore_loop
[ 1619s] RERUN distributed/tests/test_client.py::test_client_connectionpool_semaphore_loop
[ 1619s] RERUN distributed/tests/test_client.py::test_exception_text
[ 1619s] RERUN distributed/tests/test_client.py::test_exception_text
[ 1619s] RERUN distributed/tests/test_client.py::test_exception_text
[ 1619s] RERUN distributed/tests/test_worker.py::test_worker_bad_args
[ 1619s] RERUN distributed/tests/test_worker.py::test_worker_bad_args
[ 1619s] RERUN distributed/tests/test_worker.py::test_worker_bad_args
[ 1619s] =========================== short test summary info ============================
[ 1619s] SKIPPED [1] distributed/tests/test_active_memory_manager.py:325: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_active_memory_manager.py:422: need --runslow option to run
[ 1619s] SKIPPED [2] distributed/tests/test_active_memory_manager.py:468: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_active_memory_manager.py:507: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_active_memory_manager.py:588: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_actor.py:477: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_batched.py:153: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_batched.py:223: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:839: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:848: unconditional skip
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:874: unconditional skip
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:893: unconditional skip
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:1754: unconditional skip
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:2607: unconditional skip
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:2636: Use fast random selection now
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:3257: unconditional skip
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:3498: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:3560: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:3702: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:4498: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:4601: Now prefer first-in-first-out
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:5026: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:5069: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:5088: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:5315: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:5536: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:6316: known intermittent failure
[ 1619s] SKIPPED [1] distributed/tests/test_client.py:6469: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/utils_test.py:799: unconditional skip
[ 1619s] SKIPPED [4] distributed/utils_test.py:799: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_client_executor.py:130: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_config.py:311: could not import 'jsonschema': No module named 'jsonschema'
[ 1619s] SKIPPED [1] distributed/tests/test_config.py:359: could not import 'uvloop': No module named 'uvloop'
[ 1619s] SKIPPED [1] distributed/tests/test_core.py:166: no network access
[ 1619s] SKIPPED [1] distributed/tests/test_core.py:432: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_core.py:766: could not import 'crick': No module named 'crick'
[ 1619s] SKIPPED [1] distributed/tests/test_core.py:775: could not import 'crick': No module named 'crick'
[ 1619s] SKIPPED [1] distributed/tests/test_counter.py:11: no crick library
[ 1619s] SKIPPED [1] distributed/tests/test_diskutils.py:217: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_failed_workers.py:70: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_failed_workers.py:81: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_failed_workers.py:329: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_failed_workers.py:497: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_failed_workers.py:509: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_nanny.py:34: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_nanny.py:130: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_nanny.py:144: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_nanny.py:176: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_nanny.py:486: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_nanny.py:564: could not import 'ucp': No module named 'ucp'
[ 1619s] SKIPPED [1] distributed/tests/test_profile.py:68: could not import 'stacktrace': No module named 'stacktrace'
[ 1619s] SKIPPED [1] distributed/tests/test_queues.py:88: getting same client from main thread
[ 1619s] SKIPPED [1] distributed/tests/test_queues.py:112: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_resources.py:274: Skipped
[ 1619s] SKIPPED [1] distributed/tests/test_resources.py:330: Should protect resource keys from optimization
[ 1619s] SKIPPED [1] distributed/tests/test_resources.py:351: atop fusion seemed to break this
[ 1619s] SKIPPED [1] distributed/tests/test_scheduler.py:952: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_scheduler.py:1005: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_scheduler.py:1018: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_scheduler.py:1119: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_scheduler.py:1245: need --runslow option to run
[ 1619s] SKIPPED [4] distributed/tests/test_scheduler.py:2170: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_scheduler.py:2492: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_semaphore.py:129: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_semaphore.py:191: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_steal.py:249: Skipped
[ 1619s] SKIPPED [14] distributed/tests/test_steal.py:703: need --runslow option to run
[ 1619s] SKIPPED [2] distributed/tests/test_stress.py:46: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_stress.py:88: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_stress.py:200: unconditional skip
[ 1619s] SKIPPED [1] distributed/tests/test_stress.py:226: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_stress.py:248: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_utils.py:265: could not import 'pyarrow': No module named 'pyarrow'
[ 1619s] SKIPPED [1] distributed/tests/test_utils_perf.py:84: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_utils_test.py:110: This hangs on travis
[ 1619s] SKIPPED [1] distributed/tests/test_utils_test.py:373: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_utils_test.py:511: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_variable.py:192: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_worker.py:186: don't yet support uploading pyc files
[ 1619s] SKIPPED [1] distributed/tests/test_worker.py:276: could not import 'crick': No module named 'crick'
[ 1619s] SKIPPED [1] distributed/tests/test_worker.py:578: Other tests leak memory, so process-level checks trigger immediately
[ 1619s] SKIPPED [1] distributed/tests/test_worker.py:965: Our logic here is faulty
[ 1619s] SKIPPED [1] distributed/tests/test_worker.py:1152: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_worker.py:1205: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_worker.py:1357: need --runslow option to run
[ 1619s] SKIPPED [2] distributed/tests/test_worker.py:1588: could not import 'ucp': No module named 'ucp'
[ 1619s] SKIPPED [1] distributed/tests/test_worker.py:1661: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_worker.py:1783: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_worker.py:2908: need --runslow option to run
[ 1619s] SKIPPED [1] distributed/tests/test_worker.py:3162: need --runslow option to run
[ 1619s] FAILED distributed/tests/test_client.py::test_client_gather_semaphore_loop - ...
[ 1619s] FAILED distributed/tests/test_client.py::test_as_completed_condition_loop - a...
[ 1619s] FAILED distributed/tests/test_client.py::test_client_connectionpool_semaphore_loop
[ 1619s] FAILED distributed/tests/test_client.py::test_exception_text - assert 'Except...
[ 1619s] FAILED distributed/tests/test_worker.py::test_worker_bad_args - assert False
[ 1619s] = 5 failed, 1613 passed, 112 skipped, 29 deselected, 11 xfailed, 5 xpassed, 13813 warnings, 15 rerun in 1598.07s (0:26:38) =
[ 1620s] --- Logging error ---
[ 1620s] Traceback (most recent call last):
[ 1620s]   File "/usr/lib64/python3.10/logging/__init__.py", line 1103, in emit
[ 1620s]     stream.write(msg + self.terminator)
[ 1620s] ValueError: I/O operation on closed file.
[ 1620s] Call stack:
[ 1620s]   File "/home/abuild/rpmbuild/BUILD/distributed-2022.01.1/distributed/utils_perf.py", line 199, in _gc_callback
[ 1620s]     logger.warning(
[ 1620s] Message: 'full garbage collections took %d%% CPU time recently (threshold: %d%%)'
[ 1620s] Arguments: (60.73408101049419, 10.0)
[ 1623s] error: Bad exit status from /var/tmp/rpm-tmp.bgKBJB (%check)

Complete build log with all packages used and step taken to run the test suite.

await self._event.wait()
out = self._out
assert out is not None
return out.unwrap()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you elucidate a bit more the purpose of these wrapper classes, as opposed to the more direct inspection of the result that there was previously? It seems like they are related to trying to get a chain of custody for the generic _T, but I'm not sure it really buys much since the setting of the result here isn't checked, so we ultimately have an _OK(Unknown).

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah ideally this would have been done with TypedDict but they're not supported python/mypy#3863

class _OK(TypedDict, Generic[_T]):
    status: Literal["OK"]
    result: _T

class _Error(TypedDict):
    status: Literal["error"]
    exception: Exception
...
    def _set_result(self, out: _Error | _OK[_T]): ...

It seems like they are related to trying to get a chain of custody for the generic _T, but I'm not sure it really buys much since the setting of the result here isn't checked, so we ultimately have an _OK(Unknown).

I needed to draw the line somewhere of what's typed and what's not in this PR, and chose to type all the methods and classes of BaseActorFuture. _Error | _OK[_T] | None is also needed for the internal state ActorFuture so I think it's worth it for now

@ian-r-rose
Copy link
Collaborator

I love the new generic machinery @graingert, and I like that it brings it a bit closer to how typing.Awaitable[T] or asyncio.Future[T]

@graingert graingert requested a review from ian-r-rose February 14, 2022 12:20
@graingert graingert force-pushed the actor-future-deadlock branch from 749bce9 to 0364c50 Compare February 14, 2022 12:22
@graingert graingert force-pushed the actor-future-deadlock branch 2 times, most recently from 7fde98b to a4012e5 Compare February 15, 2022 13:40
@graingert graingert requested a review from fjetter February 15, 2022 16:58
@graingert graingert force-pushed the actor-future-deadlock branch from a4012e5 to 2a7b3e4 Compare February 17, 2022 10:08
@fjetter
Copy link
Member

fjetter commented Feb 17, 2022

Waiting for builds to pass and then will merge.

@jrbourbeau jrbourbeau changed the title avoid deadlock when two tasks are concurrently waiting for an unresolved ActorFuture Avoid deadlock when two tasks are concurrently waiting for an unresolved ActorFuture Feb 17, 2022
@jrbourbeau
Copy link
Member

All green 🥲

Copy link
Collaborator

@ian-r-rose ian-r-rose left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks great

@mcepl
Copy link

mcepl commented Feb 17, 2022

Was this supposed to be python310-fix? Because it doesn’t work for me: test_client.py::test_client_gather_semaphore_loop,
test_client.py::test_as_completed_condition_loop, test_client.py::test_client_connectionpool_semaphore_loop, test_client.py::test_exception_text, and test_worker.py::test_worker_bad_args fail on me.

Log of the build operation.

@fjetter
Copy link
Member

fjetter commented Feb 18, 2022

Was this supposed to be python310-fix?

This is only a step towards py3.10 but not a complete fix.

Note, we have a ticket and WIP PR about full support open

@fjetter fjetter merged commit b0dd9db into dask:main Feb 18, 2022
@fjetter
Copy link
Member

fjetter commented Feb 18, 2022

Thank you @graingert ! This looks great. Sorry for us taking so long with reviewing it.

@bnavigator bnavigator mentioned this pull request Mar 25, 2022
@graingert graingert deleted the actor-future-deadlock branch March 28, 2022 10:04
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

deadlock when two tasks are concurrently waiting for an unresolved ActorFuture The loop parameter to Queue is deprecated/removed
7 participants