Skip to content

bpo-39349: Add *cancel_futures* to Executor.shutdown() #18057

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion Doc/library/concurrent.futures.rst
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ Executor Objects
.. versionchanged:: 3.5
Added the *chunksize* argument.

.. method:: shutdown(wait=True)
.. method:: shutdown(wait=True, \*, cancel_futures=False)

Signal the executor that it should free any resources that it is using
when the currently pending futures are done executing. Calls to
Expand All @@ -82,6 +82,15 @@ Executor Objects
value of *wait*, the entire Python program will not exit until all
pending futures are done executing.

If *cancel_futures* is ``True``, this method will cancel all pending
futures that the executor has not started running. Any futures that
are completed or running won't be cancelled, regardless of the value
of *cancel_futures*.

If both *cancel_futures* and *wait* are ``True``, all futures that the
executor has started running will be completed prior to this method
returning. The remaining futures are cancelled.

You can avoid having to call this method explicitly if you use the
:keyword:`with` statement, which will shutdown the :class:`Executor`
(waiting as if :meth:`Executor.shutdown` were called with *wait* set to
Expand All @@ -94,6 +103,9 @@ Executor Objects
e.submit(shutil.copy, 'src3.txt', 'dest3.txt')
e.submit(shutil.copy, 'src4.txt', 'dest4.txt')

.. versionchanged:: 3.9
Added *cancel_futures*.


ThreadPoolExecutor
------------------
Expand Down
9 changes: 9 additions & 0 deletions Doc/whatsnew/3.9.rst
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,15 @@ that schedules a shutdown for the default executor that waits on the
Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher
implementation that polls process file descriptors. (:issue:`38692`)

concurrent.futures
------------------

Added a new *cancel_futures* parameter to
:meth:`concurrent.futures.Executor.shutdown` that cancels all pending futures
which have not started running, instead of waiting for them to complete before
shutting down the executor.
(Contributed by Kyle Stanley in :issue:`39349`.)

curses
------

Expand Down
23 changes: 22 additions & 1 deletion Lib/concurrent/futures/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -435,6 +435,24 @@ def shutdown_worker():
# is not gc-ed yet.
if executor is not None:
executor._shutdown_thread = True
# Unless there are pending work items, we have nothing to cancel.
if pending_work_items and executor._cancel_pending_futures:
# Cancel all pending futures and update pending_work_items
# to only have futures that are currently running.
new_pending_work_items = {}
for work_id, work_item in pending_work_items.items():
if not work_item.future.cancel():
new_pending_work_items[work_id] = work_item

pending_work_items = new_pending_work_items
# Drain work_ids_queue since we no longer need to
# add items to the call queue.
while True:
try:
work_ids_queue.get_nowait()
except queue.Empty:
break

# Since no new work items can be added, it is safe to shutdown
# this thread if there are no pending work items.
if not pending_work_items:
Expand Down Expand Up @@ -546,6 +564,7 @@ def __init__(self, max_workers=None, mp_context=None,
self._broken = False
self._queue_count = 0
self._pending_work_items = {}
self._cancel_pending_futures = False

# Create communication channels for the executor
# Make the call queue slightly larger than the number of processes to
Expand Down Expand Up @@ -660,9 +679,11 @@ def map(self, fn, *iterables, timeout=None, chunksize=1):
timeout=timeout)
return _chain_from_iterable_of_lists(results)

def shutdown(self, wait=True):
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._cancel_pending_futures = cancel_futures
self._shutdown_thread = True

if self._queue_management_thread:
# Wake up queue management thread
self._queue_management_thread_wakeup.wakeup()
Expand Down
15 changes: 14 additions & 1 deletion Lib/concurrent/futures/thread.py
Original file line number Diff line number Diff line change
Expand Up @@ -215,9 +215,22 @@ def _initializer_failed(self):
if work_item is not None:
work_item.future.set_exception(BrokenThreadPool(self._broken))

def shutdown(self, wait=True):
def shutdown(self, wait=True, *, cancel_futures=False):
with self._shutdown_lock:
self._shutdown = True
if cancel_futures:
# Drain all work items from the queue, and then cancel their
# associated futures.
while True:
try:
work_item = self._work_queue.get_nowait()
except queue.Empty:
break
if work_item is not None:
work_item.future.cancel()

# Send a wake-up to prevent threads calling
# _work_queue.get(block=True) from permanently blocking.
self._work_queue.put(None)
if wait:
for t in self._threads:
Expand Down
39 changes: 39 additions & 0 deletions Lib/test/test_concurrent_futures.py
Original file line number Diff line number Diff line change
Expand Up @@ -342,6 +342,29 @@ def test_hang_issue12364(self):
for f in fs:
f.result()

def test_cancel_futures(self):
executor = self.executor_type(max_workers=3)
fs = [executor.submit(time.sleep, .1) for _ in range(50)]
executor.shutdown(cancel_futures=True)
# We can't guarantee the exact number of cancellations, but we can
# guarantee that *some* were cancelled. With setting max_workers to 3,
# most of the submitted futures should have been cancelled.
cancelled = [fut for fut in fs if fut.cancelled()]
self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")

# Ensure the other futures were able to finish.
# Use "not fut.cancelled()" instead of "fut.done()" to include futures
# that may have been left in a pending state.
others = [fut for fut in fs if not fut.cancelled()]
for fut in others:
self.assertTrue(fut.done(), msg=f"{fut._state=}")
self.assertIsNone(fut.exception())

# Similar to the number of cancelled futures, we can't guarantee the
# exact number that completed. But, we can guarantee that at least
# one finished.
self.assertTrue(len(others) > 0, msg=f"{len(others)=}")

def test_hang_issue39205(self):
"""shutdown(wait=False) doesn't hang at exit with running futures.

Expand Down Expand Up @@ -422,6 +445,22 @@ def test_thread_names_default(self):
self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
t.join()

def test_cancel_futures_wait_false(self):
# Can only be reliably tested for TPE, since PPE often hangs with
# `wait=False` (even without *cancel_futures*).
rc, out, err = assert_python_ok('-c', """if True:
from concurrent.futures import ThreadPoolExecutor
from test.test_concurrent_futures import sleep_and_print
if __name__ == "__main__":
t = ThreadPoolExecutor()
t.submit(sleep_and_print, .1, "apple")
t.shutdown(wait=False, cancel_futures=True)
""".format(executor_type=self.executor_type.__name__))
# Errors in atexit hooks don't change the process exit code, check
# stderr manually.
self.assertFalse(err)
self.assertEqual(out.strip(), b"apple")


class ProcessPoolShutdownTest(ExecutorShutdownTest):
def _prime_executor(self):
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
Added a new *cancel_futures* parameter to
:meth:`concurrent.futures.Executor.shutdown` that cancels all pending futures
which have not started running, instead of waiting for them to complete before
shutting down the executor.