Skip to content

gh-108973: Fix asyncio test_subprocess_consistent_callbacks() #109431

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Sep 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 5 additions & 5 deletions Doc/library/asyncio-llapi-index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -484,19 +484,19 @@ Protocol classes can implement the following **callback methods**:
:widths: 50 50
:class: full-width-table

* - ``callback`` :meth:`pipe_data_received()
<SubprocessProtocol.pipe_data_received>`
* - ``callback`` :meth:`~SubprocessProtocol.pipe_data_received`
- Called when the child process writes data into its
*stdout* or *stderr* pipe.

* - ``callback`` :meth:`pipe_connection_lost()
<SubprocessProtocol.pipe_connection_lost>`
* - ``callback`` :meth:`~SubprocessProtocol.pipe_connection_lost`
- Called when one of the pipes communicating with
the child process is closed.

* - ``callback`` :meth:`process_exited()
<SubprocessProtocol.process_exited>`
- Called when the child process has exited.
- Called when the child process has exited. It can be called before
:meth:`~SubprocessProtocol.pipe_data_received` and
:meth:`~SubprocessProtocol.pipe_connection_lost` methods.


Event Loop Policies
Expand Down
19 changes: 18 additions & 1 deletion Doc/library/asyncio-protocol.rst
Original file line number Diff line number Diff line change
Expand Up @@ -708,6 +708,9 @@ factories passed to the :meth:`loop.subprocess_exec` and

Called when the child process has exited.

It can be called before :meth:`~SubprocessProtocol.pipe_data_received` and
:meth:`~SubprocessProtocol.pipe_connection_lost` methods.


Examples
========
Expand Down Expand Up @@ -1003,12 +1006,26 @@ The subprocess is created by the :meth:`loop.subprocess_exec` method::
def __init__(self, exit_future):
self.exit_future = exit_future
self.output = bytearray()
self.pipe_closed = False
self.exited = False

def pipe_connection_lost(self, fd, exc):
self.pipe_closed = True
self.check_for_exit()

def pipe_data_received(self, fd, data):
self.output.extend(data)

def process_exited(self):
self.exit_future.set_result(True)
self.exited = True
# process_exited() method can be called before
# pipe_connection_lost() method: wait until both methods are
# called.
self.check_for_exit()

def check_for_exit(self):
if self.pipe_closed and self.exited:
self.exit_future.set_result(True)

async def get_date():
# Get a reference to the event loop as we plan to use
Expand Down
3 changes: 1 addition & 2 deletions Lib/asyncio/unix_events.py
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,7 @@ async def _make_subprocess_transport(self, protocol, args, shell,
return transp

def _child_watcher_callback(self, pid, returncode, transp):
# Skip one iteration for callbacks to be executed
self.call_soon_threadsafe(self.call_soon, transp._process_exited, returncode)
self.call_soon_threadsafe(transp._process_exited, returncode)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's safe to call the method immediately. It's up to the child watcher to take care to call this in the loop thread. ThreadedChildWatcher already does.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The child watcher runs in a different thread. call_soon_threadsafe() is needed to make sure that the callback is run in the same thread that the event loop. It's an important principle in asyncio, no?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What I meant is that the child watcher itself already uses call_soon_threadsafe() to schedule the callback. But I've just noticed a little complication. The documentation for AbstractChildWatcher.add_child_handler requires the callback itself to be thread-safe, which could mean that it can be called from any thread. One of the calls to call_soon_threadsafe() is redundant.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did a bit of digging and the comment requiring the callback to be thread-safe was there already before there was even support for running the loop on different threads. The watchers have been updated to use call_soon_threadsafe() for callbacks after that. Since watchers are public API (thankfully deprecated), the requirement can't be relaxed.

Possibly one of the redundant calls can still be removed, but considering 3.14 will remove watchers altogether it might not be worth it. Let's say that I'm skeptical of this approach of several layers of scheduling callbacks (there is one more when the transport calls the protocol's process_exited method) because it can hide race conditions such as the one in this bug.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you be ok if this call is removed in a separated PR? I'm not comfortable to touch asyncio, but the bug is impacting many CIs and it's very annoying. I would prefer that someone who is more comfortable with asyncio does this change :-)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, clearly this would need more discussion/investigation/etc. Let's forget about it for this PR.


async def create_unix_connection(
self, protocol_factory, path=None, *,
Expand Down
54 changes: 43 additions & 11 deletions Lib/test/test_asyncio/test_subprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,21 +753,44 @@ async def main() -> None:

self.loop.run_until_complete(main())

def test_subprocess_consistent_callbacks(self):
def test_subprocess_protocol_events(self):
# gh-108973: Test that all subprocess protocol methods are called.
# The protocol methods are not called in a determistic order.
# The order depends on the event loop and the operating system.
events = []
fds = [1, 2]
expected = [
('pipe_data_received', 1, b'stdout'),
('pipe_data_received', 2, b'stderr'),
('pipe_connection_lost', 1),
('pipe_connection_lost', 2),
'process_exited',
]
per_fd_expected = [
'pipe_data_received',
'pipe_connection_lost',
]

class MyProtocol(asyncio.SubprocessProtocol):
def __init__(self, exit_future: asyncio.Future) -> None:
self.exit_future = exit_future

def pipe_data_received(self, fd, data) -> None:
events.append(('pipe_data_received', fd, data))
self.exit_maybe()

def pipe_connection_lost(self, fd, exc) -> None:
events.append('pipe_connection_lost')
events.append(('pipe_connection_lost', fd))
self.exit_maybe()

def process_exited(self) -> None:
events.append('process_exited')
self.exit_future.set_result(True)
self.exit_maybe()

def exit_maybe(self):
# Only exit when we got all expected events
if len(events) >= len(expected):
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before, we stopped as soon as we seen process_exited. But sometimes, process_exited is get earlier than other events, and so events only contains some expected events and the test fails also in this case :-(

self.exit_future.set_result(True)

async def main() -> None:
loop = asyncio.get_running_loop()
Expand All @@ -777,15 +800,24 @@ async def main() -> None:
sys.executable, '-c', code, stdin=None)
await exit_future
transport.close()
self.assertEqual(events, [
('pipe_data_received', 1, b'stdout'),
('pipe_data_received', 2, b'stderr'),
'pipe_connection_lost',
'pipe_connection_lost',
'process_exited',
])

self.loop.run_until_complete(main())
return events

events = self.loop.run_until_complete(main())

# First, make sure that we received all events
self.assertSetEqual(set(events), set(expected))

# Second, check order of pipe events per file descriptor
per_fd_events = {fd: [] for fd in fds}
for event in events:
if event == 'process_exited':
continue
name, fd = event[:2]
per_fd_events[fd].append(name)

for fd in fds:
self.assertEqual(per_fd_events[fd], per_fd_expected, (fd, events))

def test_subprocess_communicate_stdout(self):
# See https://github.com/python/cpython/issues/100133
Expand Down