From f7362927039cc6170e4b168b0efae038632b3f0f Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Fri, 17 Jan 2020 00:08:33 -0500 Subject: [PATCH 01/14] Add *cancel_futures* to Executor.shutdown() --- Doc/library/concurrent.futures.rst | 10 +++++++++- Doc/whatsnew/3.9.rst | 9 +++++++++ Lib/concurrent/futures/process.py | 10 +++++++++- Lib/concurrent/futures/thread.py | 12 +++++++++++- Lib/test/test_concurrent_futures.py | 25 +++++++++++++++++++++++++ 5 files changed, 63 insertions(+), 3 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index d71f2d80c9e2d0..d9839b86b92c96 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -67,7 +67,7 @@ Executor Objects .. versionchanged:: 3.5 Added the *chunksize* argument. - .. method:: shutdown(wait=True) + .. method:: shutdown(wait=True, cancel_futures=False) Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to @@ -82,6 +82,11 @@ Executor Objects value of *wait*, the entire Python program will not exit until all pending futures are done executing. + If *cancel_futures* is ``True``, this method will cancel all pending + futures that the executor has not started running. Any futures that + are completed or running won't be cancelled, regardless of the value + of *cancel_futures*. + You can avoid having to call this method explicitly if you use the :keyword:`with` statement, which will shutdown the :class:`Executor` (waiting as if :meth:`Executor.shutdown` were called with *wait* set to @@ -94,6 +99,9 @@ Executor Objects e.submit(shutil.copy, 'src3.txt', 'dest3.txt') e.submit(shutil.copy, 'src4.txt', 'dest4.txt') + .. versionchanged:: 3.9 + Added *cancel_futures*. + ThreadPoolExecutor ------------------ diff --git a/Doc/whatsnew/3.9.rst b/Doc/whatsnew/3.9.rst index f40685c932793f..f24e920751a7df 100644 --- a/Doc/whatsnew/3.9.rst +++ b/Doc/whatsnew/3.9.rst @@ -145,6 +145,15 @@ that schedules a shutdown for the default executor that waits on the Added :class:`asyncio.PidfdChildWatcher`, a Linux-specific child watcher implementation that polls process file descriptors. (:issue:`38692`) +concurrent.futures +------------------ + +Added a new *cancel_futures* parameter to +:meth:`concurrent.futures.Executor.shutdown` that cancels all pending futures +which have not started running, instead of waiting for them to complete before +shutting down the executor. +(Contributed by Kyle Stanley in :issue:`39349`.) + curses ------ diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 9e2ab9db64f664..50238acc7b673a 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -435,6 +435,10 @@ def shutdown_worker(): # is not gc-ed yet. if executor is not None: executor._shutdown_thread = True + if executor._cancel_pending_work: + for work_item in pending_work_items.values(): + work_item.future.cancel() + # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. if not pending_work_items: @@ -546,6 +550,7 @@ def __init__(self, max_workers=None, mp_context=None, self._broken = False self._queue_count = 0 self._pending_work_items = {} + self._cancel_pending_work = False # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to @@ -660,9 +665,12 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): timeout=timeout) return _chain_from_iterable_of_lists(results) - def shutdown(self, wait=True): + def shutdown(self, wait=True, cancel_futures=False): with self._shutdown_lock: self._shutdown_thread = True + if cancel_futures: + self._cancel_pending_work = True + if self._queue_management_thread: # Wake up queue management thread self._queue_management_thread_wakeup.wakeup() diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index b89f8f24d4d673..69115e86dcc447 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -215,10 +215,20 @@ def _initializer_failed(self): if work_item is not None: work_item.future.set_exception(BrokenThreadPool(self._broken)) - def shutdown(self, wait=True): + def shutdown(self, wait=True, cancel_futures=False): with self._shutdown_lock: self._shutdown = True self._work_queue.put(None) + if cancel_futures: + while True: + try: + work_item = self._work_queue.get_nowait() + except queue.Empty: + self._work_queue.put(None) + break + if work_item is not None: + work_item.future.cancel() + if wait: for t in self._threads: t.join() diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index c97351636e8692..326340d5046076 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -342,6 +342,31 @@ def test_hang_issue12364(self): for f in fs: f.result() + def test_cancel_futures(self): + max_workers = 5 + executor = self.executor_type(max_workers) + fs = [executor.submit(time.sleep, .1) for _ in range(50)] + # Wait for workers to complete first round of work + time.sleep(.1) + executor.shutdown(cancel_futures=True) + + # We can't guarantee the exact number of cancellations, but we can + # guarantee that *some* were cancelled. + cancelled = [fut for fut in fs if fut.cancelled()] + self.assertTrue(len(cancelled) > 0) + + # Ensure the other futures were able to finish (no InvalidStateError). + # Use "not fut.cancelled()" instead of "fut.done()" to include futures + # that may have been left in a pending state. + others = [fut for fut in fs if not fut.cancelled()] + for fut in others: + self.assertTrue(fut.done()) + self.assertIsNone(fut.exception()) + + # The minimum number of finished futures should be max_workers, since + # we waited for the first round to complete. + self.assertTrue(len(others) >= max_workers) + class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): def _prime_executor(self): From 62679509fbb1a9253275e86bb5f6a07e8e4d9754 Mon Sep 17 00:00:00 2001 From: "blurb-it[bot]" <43283697+blurb-it[bot]@users.noreply.github.com> Date: Sun, 19 Jan 2020 04:12:37 +0000 Subject: [PATCH 02/14] =?UTF-8?q?=F0=9F=93=9C=F0=9F=A4=96=20Added=20by=20b?= =?UTF-8?q?lurb=5Fit.?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst | 4 ++++ 1 file changed, 4 insertions(+) create mode 100644 Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst diff --git a/Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst b/Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst new file mode 100644 index 00000000000000..cc52700f67031b --- /dev/null +++ b/Misc/NEWS.d/next/Library/2020-01-19-04-12-34.bpo-39349.7CV-LC.rst @@ -0,0 +1,4 @@ +Added a new *cancel_futures* parameter to +:meth:`concurrent.futures.Executor.shutdown` that cancels all pending futures +which have not started running, instead of waiting for them to complete before +shutting down the executor. \ No newline at end of file From b04709943a7dea7a8013351421141385b9e20737 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Wed, 22 Jan 2020 22:44:58 -0500 Subject: [PATCH 03/14] Make *cancel_futures* keyword-only --- Lib/concurrent/futures/process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 50238acc7b673a..93b9a9c93438f5 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -665,7 +665,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): timeout=timeout) return _chain_from_iterable_of_lists(results) - def shutdown(self, wait=True, cancel_futures=False): + def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: self._shutdown_thread = True if cancel_futures: From f9b6b3376ceadec91dab9a459c32696cbf8920a7 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Wed, 22 Jan 2020 22:51:46 -0500 Subject: [PATCH 04/14] Add comments to explain *cancel_futures* in TPE --- Lib/concurrent/futures/thread.py | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 69115e86dcc447..c927d9133d0f38 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -220,10 +220,15 @@ def shutdown(self, wait=True, cancel_futures=False): self._shutdown = True self._work_queue.put(None) if cancel_futures: + # Drain all work items from the queue, and then cancel their + # associated futures. while True: try: work_item = self._work_queue.get_nowait() except queue.Empty: + # Once the queue has been drained, send a wake-up to + # prevent threads calling _work_queue.get(block=True) + # from permanently blocking self._work_queue.put(None) break if work_item is not None: From 25376d746a3754acedddcd1c3a779b6f26bc941d Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Thu, 23 Jan 2020 20:27:48 -0500 Subject: [PATCH 05/14] Improve docs and grammar fix --- Doc/library/concurrent.futures.rst | 4 ++++ Lib/concurrent/futures/thread.py | 2 +- 2 files changed, 5 insertions(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index d9839b86b92c96..89ef237f1ac4a2 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -87,6 +87,10 @@ Executor Objects are completed or running won't be cancelled, regardless of the value of *cancel_futures*. + If both *cancel_futures* and *wait* are ``True``, all futures that the + executor has started running will be completed prior to shutting down. + The remaining futures are cancelled. + You can avoid having to call this method explicitly if you use the :keyword:`with` statement, which will shutdown the :class:`Executor` (waiting as if :meth:`Executor.shutdown` were called with *wait* set to diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index c927d9133d0f38..7c56bb7e158ac2 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -228,7 +228,7 @@ def shutdown(self, wait=True, cancel_futures=False): except queue.Empty: # Once the queue has been drained, send a wake-up to # prevent threads calling _work_queue.get(block=True) - # from permanently blocking + # from permanently blocking. self._work_queue.put(None) break if work_item is not None: From 9a168e43755056ccced8159eed066df1ab2fe34c Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Fri, 24 Jan 2020 21:38:53 -0500 Subject: [PATCH 06/14] Address review feedback --- Doc/library/concurrent.futures.rst | 4 ++-- Lib/concurrent/futures/thread.py | 5 ++--- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 89ef237f1ac4a2..793686a637e2a9 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -88,8 +88,8 @@ Executor Objects of *cancel_futures*. If both *cancel_futures* and *wait* are ``True``, all futures that the - executor has started running will be completed prior to shutting down. - The remaining futures are cancelled. + executor has started running will be completed prior to this method + returning. The remaining futures are cancelled. You can avoid having to call this method explicitly if you use the :keyword:`with` statement, which will shutdown the :class:`Executor` diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 7c56bb7e158ac2..2986aba7775d32 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -215,10 +215,9 @@ def _initializer_failed(self): if work_item is not None: work_item.future.set_exception(BrokenThreadPool(self._broken)) - def shutdown(self, wait=True, cancel_futures=False): + def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: self._shutdown = True - self._work_queue.put(None) if cancel_futures: # Drain all work items from the queue, and then cancel their # associated futures. @@ -229,11 +228,11 @@ def shutdown(self, wait=True, cancel_futures=False): # Once the queue has been drained, send a wake-up to # prevent threads calling _work_queue.get(block=True) # from permanently blocking. - self._work_queue.put(None) break if work_item is not None: work_item.future.cancel() + self._work_queue.put(None) if wait: for t in self._threads: t.join() From 7bd8134d4bcf8ad6562299b8da9701eed86c56c5 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Fri, 24 Jan 2020 21:44:41 -0500 Subject: [PATCH 07/14] Add kw-only asterisk to executor.shutdown() docs --- Doc/library/concurrent.futures.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Doc/library/concurrent.futures.rst b/Doc/library/concurrent.futures.rst index 793686a637e2a9..b21d5594c84fa0 100644 --- a/Doc/library/concurrent.futures.rst +++ b/Doc/library/concurrent.futures.rst @@ -67,7 +67,7 @@ Executor Objects .. versionchanged:: 3.5 Added the *chunksize* argument. - .. method:: shutdown(wait=True, cancel_futures=False) + .. method:: shutdown(wait=True, \*, cancel_futures=False) Signal the executor that it should free any resources that it is using when the currently pending futures are done executing. Calls to From 8b08b2fa8e518012c85ad80497c98f741e836c50 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Sat, 25 Jan 2020 18:48:55 -0500 Subject: [PATCH 08/14] Update code comments --- Lib/concurrent/futures/thread.py | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/Lib/concurrent/futures/thread.py b/Lib/concurrent/futures/thread.py index 2986aba7775d32..be79161bf8561d 100644 --- a/Lib/concurrent/futures/thread.py +++ b/Lib/concurrent/futures/thread.py @@ -225,13 +225,12 @@ def shutdown(self, wait=True, *, cancel_futures=False): try: work_item = self._work_queue.get_nowait() except queue.Empty: - # Once the queue has been drained, send a wake-up to - # prevent threads calling _work_queue.get(block=True) - # from permanently blocking. break if work_item is not None: work_item.future.cancel() + # Send a wake-up to prevent threads calling + # _work_queue.get(block=True) from permanently blocking. self._work_queue.put(None) if wait: for t in self._threads: From f706c5f70e65a1570028fa5f8b285747160cd535 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Wed, 29 Jan 2020 07:12:00 -0500 Subject: [PATCH 09/14] Improve future cancellation for PPE --- Lib/concurrent/futures/process.py | 25 +++++++++++++++++++------ 1 file changed, 19 insertions(+), 6 deletions(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 93b9a9c93438f5..7449530ecafd9d 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -435,9 +435,23 @@ def shutdown_worker(): # is not gc-ed yet. if executor is not None: executor._shutdown_thread = True - if executor._cancel_pending_work: - for work_item in pending_work_items.values(): - work_item.future.cancel() + # Unless there are pending work items, we have nothing to cancel. + if pending_work_items and executor._cancel_pending_futures: + # Cancel all pending futures and update pending_work_items + # to only have futures that are currently running. + new_pending_work_items = {} + for work_id, work_item in pending_work_items.items(): + if not work_item.future.cancel(): + new_pending_work_items[work_id] = work_item + + pending_work_items = new_pending_work_items + # Drain work_ids_queue since we no longer need to + # add items to the call queue. + while True: + try: + work_ids_queue.get_nowait() + except queue.Empty: + break # Since no new work items can be added, it is safe to shutdown # this thread if there are no pending work items. @@ -550,7 +564,7 @@ def __init__(self, max_workers=None, mp_context=None, self._broken = False self._queue_count = 0 self._pending_work_items = {} - self._cancel_pending_work = False + self._cancel_pending_futures = False # Create communication channels for the executor # Make the call queue slightly larger than the number of processes to @@ -668,8 +682,7 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: self._shutdown_thread = True - if cancel_futures: - self._cancel_pending_work = True + self._cancel_pending_futures = cancel_futures if self._queue_management_thread: # Wake up queue management thread From bdbc8e399de2d1a82c80f3d16413fe236f03ce78 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Wed, 29 Jan 2020 07:12:39 -0500 Subject: [PATCH 10/14] Fix race condition and improve tests --- Lib/test/test_concurrent_futures.py | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 326340d5046076..7002436167b3d1 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -343,29 +343,27 @@ def test_hang_issue12364(self): f.result() def test_cancel_futures(self): - max_workers = 5 - executor = self.executor_type(max_workers) + executor = self.executor_type() fs = [executor.submit(time.sleep, .1) for _ in range(50)] - # Wait for workers to complete first round of work - time.sleep(.1) executor.shutdown(cancel_futures=True) - # We can't guarantee the exact number of cancellations, but we can - # guarantee that *some* were cancelled. + # guarantee that *some* were cancelled. In this case, at least half + # should have been cancelled. cancelled = [fut for fut in fs if fut.cancelled()] - self.assertTrue(len(cancelled) > 0) + self.assertTrue(len(cancelled) >= 25, msg=f"{len(cancelled)=}") - # Ensure the other futures were able to finish (no InvalidStateError). + # Ensure the other futures were able to finish. # Use "not fut.cancelled()" instead of "fut.done()" to include futures # that may have been left in a pending state. others = [fut for fut in fs if not fut.cancelled()] for fut in others: - self.assertTrue(fut.done()) + self.assertTrue(fut.done(), msg=f"{fut._state=}") self.assertIsNone(fut.exception()) - # The minimum number of finished futures should be max_workers, since - # we waited for the first round to complete. - self.assertTrue(len(others) >= max_workers) + # Similar to the number of cancelled futures, we can't guarantee the + # exact number that completed. But, we can guarantee that at least + # one finished. + self.assertTrue(len(others) > 0, msg=f"{len(others)=}") class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): From 41baf767ed0ca76e0e1990ffe2bb0af8863602d0 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Wed, 29 Jan 2020 07:21:14 -0500 Subject: [PATCH 11/14] Add TPE test for interpreter exit Co-authored-by: Brian Quinlain --- Lib/test/test_concurrent_futures.py | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index 7002436167b3d1..e9a3d0c10e26e7 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -425,6 +425,22 @@ def test_thread_names_default(self): self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') t.join() + def test_cancel_futures_wait_false(self): + # Can only be reliably tested for TPE, since PPE often hangs with + # `wait=False` (even without *cancel_futures*). + rc, out, err = assert_python_ok('-c', """if True: + from concurrent.futures import ThreadPoolExecutor + from test.test_concurrent_futures import sleep_and_print + if __name__ == "__main__": + t = ThreadPoolExecutor() + t.submit(sleep_and_print, .1, "apple") + t.shutdown(wait=False, cancel_futures=True) + """.format(executor_type=self.executor_type.__name__)) + # Errors in atexit hooks don't change the process exit code, check + # stderr manually. + self.assertFalse(err) + self.assertEqual(out.strip(), b"apple") + class ProcessPoolShutdownTest(ExecutorShutdownTest): def _prime_executor(self): From bee0e6817f4f9dbc131f199191098c2991103c8e Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Wed, 29 Jan 2020 07:27:00 -0500 Subject: [PATCH 12/14] Fix whitespace --- Lib/test/test_concurrent_futures.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index e9a3d0c10e26e7..6f6c596a4158c9 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -426,7 +426,7 @@ def test_thread_names_default(self): t.join() def test_cancel_futures_wait_false(self): - # Can only be reliably tested for TPE, since PPE often hangs with + # Can only be reliably tested for TPE, since PPE often hangs with # `wait=False` (even without *cancel_futures*). rc, out, err = assert_python_ok('-c', """if True: from concurrent.futures import ThreadPoolExecutor From bf75a3e6e021615104e542c69a1b753efa686807 Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Fri, 31 Jan 2020 23:42:42 -0500 Subject: [PATCH 13/14] Set _cancel_pending_futures before _shutdown_thread --- Lib/concurrent/futures/process.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/concurrent/futures/process.py b/Lib/concurrent/futures/process.py index 7449530ecafd9d..fd9f572b6c7116 100644 --- a/Lib/concurrent/futures/process.py +++ b/Lib/concurrent/futures/process.py @@ -681,8 +681,8 @@ def map(self, fn, *iterables, timeout=None, chunksize=1): def shutdown(self, wait=True, *, cancel_futures=False): with self._shutdown_lock: - self._shutdown_thread = True self._cancel_pending_futures = cancel_futures + self._shutdown_thread = True if self._queue_management_thread: # Wake up queue management thread From d756264135053661dde9a9fb7de8579e238d9b9b Mon Sep 17 00:00:00 2001 From: Kyle Stanley Date: Sat, 1 Feb 2020 00:19:16 -0500 Subject: [PATCH 14/14] Set max_workers to 3 in test_cancel_futures --- Lib/test/test_concurrent_futures.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/Lib/test/test_concurrent_futures.py b/Lib/test/test_concurrent_futures.py index c83ed34ddf6dbd..af77f813419104 100644 --- a/Lib/test/test_concurrent_futures.py +++ b/Lib/test/test_concurrent_futures.py @@ -343,14 +343,14 @@ def test_hang_issue12364(self): f.result() def test_cancel_futures(self): - executor = self.executor_type() + executor = self.executor_type(max_workers=3) fs = [executor.submit(time.sleep, .1) for _ in range(50)] executor.shutdown(cancel_futures=True) # We can't guarantee the exact number of cancellations, but we can - # guarantee that *some* were cancelled. In this case, at least half - # should have been cancelled. + # guarantee that *some* were cancelled. With setting max_workers to 3, + # most of the submitted futures should have been cancelled. cancelled = [fut for fut in fs if fut.cancelled()] - self.assertTrue(len(cancelled) >= 25, msg=f"{len(cancelled)=}") + self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}") # Ensure the other futures were able to finish. # Use "not fut.cancelled()" instead of "fut.done()" to include futures