Skip to content

Revert "bpo-35621: Support running subprocesses in asyncio when loop is executed in non-main thread (#13630)" #13793

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
240 changes: 28 additions & 212 deletions Lib/asyncio/unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

import errno
import io
import itertools
import os
import selectors
import signal
Expand Down Expand Up @@ -30,9 +29,7 @@
__all__ = (
'SelectorEventLoop',
'AbstractChildWatcher', 'SafeChildWatcher',
'FastChildWatcher',
'MultiLoopChildWatcher', 'ThreadedChildWatcher',
'DefaultEventLoopPolicy',
'FastChildWatcher', 'DefaultEventLoopPolicy',
)


Expand Down Expand Up @@ -187,13 +184,6 @@ async def _make_subprocess_transport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
extra=None, **kwargs):
with events.get_child_watcher() as watcher:
if not watcher.is_active():
# Check early.
# Raising exception before process creation
# prevents subprocess execution if the watcher
# is not ready to handle it.
raise RuntimeError("asyncio.get_child_watcher() is not activated, "
"subprocess support is not installed.")
waiter = self.create_future()
transp = _UnixSubprocessTransport(self, protocol, args, shell,
stdin, stdout, stderr, bufsize,
Expand Down Expand Up @@ -848,15 +838,6 @@ def close(self):
"""
raise NotImplementedError()

def is_active(self):
"""Watcher status.

Return True if the watcher is installed and ready to handle process exit
notifications.

"""
raise NotImplementedError()

def __enter__(self):
"""Enter the watcher's context and allow starting new processes

Expand All @@ -868,20 +849,6 @@ def __exit__(self, a, b, c):
raise NotImplementedError()


def _compute_returncode(status):
if os.WIFSIGNALED(status):
# The child process died because of a signal.
return -os.WTERMSIG(status)
elif os.WIFEXITED(status):
# The child process exited (e.g sys.exit()).
return os.WEXITSTATUS(status)
else:
# The child exited, but we don't understand its status.
# This shouldn't happen, but if it does, let's just
# return that status; perhaps that helps debug it.
return status


class BaseChildWatcher(AbstractChildWatcher):

def __init__(self):
Expand All @@ -891,9 +858,6 @@ def __init__(self):
def close(self):
self.attach_loop(None)

def is_active(self):
return self._loop is not None and self._loop.is_running()

def _do_waitpid(self, expected_pid):
raise NotImplementedError()

Expand Down Expand Up @@ -934,6 +898,19 @@ def _sig_chld(self):
'exception': exc,
})

def _compute_returncode(self, status):
if os.WIFSIGNALED(status):
# The child process died because of a signal.
return -os.WTERMSIG(status)
elif os.WIFEXITED(status):
# The child process exited (e.g sys.exit()).
return os.WEXITSTATUS(status)
else:
# The child exited, but we don't understand its status.
# This shouldn't happen, but if it does, let's just
# return that status; perhaps that helps debug it.
return status


class SafeChildWatcher(BaseChildWatcher):
"""'Safe' child watcher implementation.
Expand All @@ -957,6 +934,11 @@ def __exit__(self, a, b, c):
pass

def add_child_handler(self, pid, callback, *args):
if self._loop is None:
raise RuntimeError(
"Cannot add child handler, "
"the child watcher does not have a loop attached")

self._callbacks[pid] = (callback, args)

# Prevent a race condition in case the child is already terminated.
Expand Down Expand Up @@ -992,7 +974,7 @@ def _do_waitpid(self, expected_pid):
# The child process is still alive.
return

returncode = _compute_returncode(status)
returncode = self._compute_returncode(status)
if self._loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
Expand Down Expand Up @@ -1053,6 +1035,11 @@ def __exit__(self, a, b, c):
def add_child_handler(self, pid, callback, *args):
assert self._forks, "Must use the context manager"

if self._loop is None:
raise RuntimeError(
"Cannot add child handler, "
"the child watcher does not have a loop attached")

with self._lock:
try:
returncode = self._zombies.pop(pid)
Expand Down Expand Up @@ -1085,7 +1072,7 @@ def _do_waitpid_all(self):
# A child process is still alive.
return

returncode = _compute_returncode(status)
returncode = self._compute_returncode(status)

with self._lock:
try:
Expand Down Expand Up @@ -1114,177 +1101,6 @@ def _do_waitpid_all(self):
callback(pid, returncode, *args)


class MultiLoopChildWatcher(AbstractChildWatcher):
# The class keeps compatibility with AbstractChildWatcher ABC
# To achieve this it has empty attach_loop() method
# and doesn't accept explicit loop argument
# for add_child_handler()/remove_child_handler()
# but retrieves the current loop by get_running_loop()

def __init__(self):
self._callbacks = {}
self._saved_sighandler = None

def is_active(self):
return self._saved_sighandler is not None

def close(self):
self._callbacks.clear()
if self._saved_sighandler is not None:
handler = signal.getsignal(signal.SIGCHLD)
if handler != self._sig_chld:
logger.warning("SIGCHLD handler was changed by outside code")
else:
signal.signal(signal.SIGCHLD, self._saved_sighandler)
self._saved_sighandler = None

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def add_child_handler(self, pid, callback, *args):
loop = events.get_running_loop()
self._callbacks[pid] = (loop, callback, args)

# Prevent a race condition in case the child is already terminated.
self._do_waitpid(pid)

def remove_child_handler(self, pid):
try:
del self._callbacks[pid]
return True
except KeyError:
return False

def attach_loop(self, loop):
# Don't save the loop but initialize itself if called first time
# The reason to do it here is that attach_loop() is called from
# unix policy only for the main thread.
# Main thread is required for subscription on SIGCHLD signal
if self._saved_sighandler is None:
self._saved_sighandler = signal.signal(signal.SIGCHLD, self._sig_chld)
if self._saved_sighandler is None:
logger.warning("Previous SIGCHLD handler was set by non-Python code, "
"restore to default handler on watcher close.")
self._saved_sighandler = signal.SIG_DFL

# Set SA_RESTART to limit EINTR occurrences.
signal.siginterrupt(signal.SIGCHLD, False)

def _do_waitpid_all(self):
for pid in list(self._callbacks):
self._do_waitpid(pid)

def _do_waitpid(self, expected_pid):
assert expected_pid > 0

try:
pid, status = os.waitpid(expected_pid, os.WNOHANG)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255",
pid)
debug_log = False
else:
if pid == 0:
# The child process is still alive.
return

returncode = _compute_returncode(status)
debug_log = True
try:
loop, callback, args = self._callbacks.pop(pid)
except KeyError: # pragma: no cover
# May happen if .remove_child_handler() is called
# after os.waitpid() returns.
logger.warning("Child watcher got an unexpected pid: %r",
pid, exc_info=True)
else:
if loop.is_closed():
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
else:
if debug_log and loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)
loop.call_soon_threadsafe(callback, pid, returncode, *args)

def _sig_chld(self, signum, frame):
try:
self._do_waitpid_all()
except (SystemExit, KeyboardInterrupt):
raise
except BaseException:
logger.warning('Unknown exception in SIGCHLD handler', exc_info=True)


class ThreadedChildWatcher(AbstractChildWatcher):
# The watcher uses a thread per process
# for waiting for the process finish.
# It doesn't require subscription on POSIX signal

def __init__(self):
self._pid_counter = itertools.count(0)

def is_active(self):
return True

def close(self):
pass

def __enter__(self):
return self

def __exit__(self, exc_type, exc_val, exc_tb):
pass

def add_child_handler(self, pid, callback, *args):
loop = events.get_running_loop()
thread = threading.Thread(target=self._do_waitpid,
name=f"waitpid-{next(self._pid_counter)}",
args=(loop, pid, callback, args),
daemon=True)
thread.start()

def remove_child_handler(self, pid):
# asyncio never calls remove_child_handler() !!!
# The method is no-op but is implemented because
# abstract base classe requires it
return True

def attach_loop(self, loop):
pass

def _do_waitpid(self, loop, expected_pid, callback, args):
assert expected_pid > 0

try:
pid, status = os.waitpid(expected_pid, 0)
except ChildProcessError:
# The child process is already reaped
# (may happen if waitpid() is called elsewhere).
pid = expected_pid
returncode = 255
logger.warning(
"Unknown child process pid %d, will report returncode 255",
pid)
else:
returncode = _compute_returncode(status)
if loop.get_debug():
logger.debug('process %s exited with returncode %s',
expected_pid, returncode)

if loop.is_closed():
logger.warning("Loop %r that handles pid %r is closed", loop, pid)
else:
loop.call_soon_threadsafe(callback, pid, returncode, *args)


class _UnixDefaultEventLoopPolicy(events.BaseDefaultEventLoopPolicy):
"""UNIX event loop policy with a watcher for child processes."""
_loop_factory = _UnixSelectorEventLoop
Expand All @@ -1296,7 +1112,7 @@ def __init__(self):
def _init_watcher(self):
with events._lock:
if self._watcher is None: # pragma: no branch
self._watcher = ThreadedChildWatcher()
self._watcher = SafeChildWatcher()
if isinstance(threading.current_thread(),
threading._MainThread):
self._watcher.attach_loop(self._local._loop)
Expand All @@ -1318,7 +1134,7 @@ def set_event_loop(self, loop):
def get_child_watcher(self):
"""Get the watcher for child processes.

If not yet set, a ThreadedChildWatcher object is automatically created.
If not yet set, a SafeChildWatcher object is automatically created.
"""
if self._watcher is None:
self._init_watcher()
Expand Down
40 changes: 1 addition & 39 deletions Lib/test/test_asyncio/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -633,7 +633,6 @@ async def execute():

self.assertIsNone(self.loop.run_until_complete(execute()))


if sys.platform != 'win32':
# Unix
class SubprocessWatcherMixin(SubprocessMixin):
Expand All @@ -649,24 +648,7 @@ def setUp(self):
watcher = self.Watcher()
watcher.attach_loop(self.loop)
policy.set_child_watcher(watcher)

def tearDown(self):
super().setUp()
policy = asyncio.get_event_loop_policy()
watcher = policy.get_child_watcher()
policy.set_child_watcher(None)
watcher.attach_loop(None)
watcher.close()

class SubprocessThreadedWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):

Watcher = unix_events.ThreadedChildWatcher

class SubprocessMultiLoopWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):

Watcher = unix_events.MultiLoopChildWatcher
self.addCleanup(policy.set_child_watcher, None)

class SubprocessSafeWatcherTests(SubprocessWatcherMixin,
test_utils.TestCase):
Expand All @@ -688,25 +670,5 @@ def setUp(self):
self.set_event_loop(self.loop)


class GenericWatcherTests:

def test_create_subprocess_fails_with_inactive_watcher(self):

async def execute():
watcher = mock.create_authspec(asyncio.AbstractChildWatcher)
watcher.is_active.return_value = False
asyncio.set_child_watcher(watcher)

with self.assertRaises(RuntimeError):
await subprocess.create_subprocess_exec(
support.FakePath(sys.executable), '-c', 'pass')

watcher.add_child_handler.assert_not_called()

self.assertIsNone(self.loop.run_until_complete(execute()))




if __name__ == '__main__':
unittest.main()
Loading