From ccd0bbc67ab284fb2cf8db72cd1be4b8115b19c2 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Sun, 20 Jul 2025 08:19:01 +0200 Subject: [PATCH 01/21] draft: impl lazy input consumption in mp.Pool.imap(_unordered) --- Lib/multiprocessing/pool.py | 109 ++++++++++++++++++++++++++++-------- 1 file changed, 85 insertions(+), 24 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index f979890170b1a1..a0b50a53745368 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -390,21 +390,57 @@ def _guarded_task_generation(self, result_job, func, iterable): i = -1 for i, x in enumerate(iterable): yield (result_job, i, func, (x,), {}) + + except Exception as e: + yield (result_job, i+1, _helper_reraises_exception, (e,), {}) + + def _guarded_task_generation_lazy(self, result_job, func, iterable, + lazy_task_gen_helper): + '''Provides a generator of tasks for imap and imap_unordered with + appropriate handling for iterables which throw exceptions during + iteration.''' + if not lazy_task_gen_helper.feature_enabled: + yield from self._guarded_task_generation(result_job, func, iterable) + return + + try: + i = -1 + enumerated_iter = iter(enumerate(iterable)) + thread = threading.current_thread() + max_generated_tasks = self._processes + lazy_task_gen_helper.buffersize + + while thread._state == RUN: + with lazy_task_gen_helper.iterator_cond: + if lazy_task_gen_helper.not_finished_tasks >= max_generated_tasks: + continue # wait for some task to be (picked up and) finished + + try: + i, x = enumerated_iter.__next__() + except StopIteration: + break + + yield (result_job, i, func, (x,), {}) + lazy_task_gen_helper.tasks_generated += 1 + except Exception as e: yield (result_job, i+1, _helper_reraises_exception, (e,), {}) - def imap(self, func, iterable, chunksize=1): + def imap(self, func, iterable, chunksize=1, buffersize=None): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' self._check_running() if chunksize == 1: - result = IMapIterator(self) + result = IMapIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation(result._job, func, iterable), - result._set_length - )) + self._guarded_task_generation_lazy(result._job, + func, + iterable, + result._lazy_task_gen_helper), + result._set_length, + ) + ) return result else: if chunksize < 1: @@ -412,42 +448,50 @@ def imap(self, func, iterable, chunksize=1): "Chunksize must be 1+, not {0:n}".format( chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapIterator(self) + result = IMapIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation(result._job, - mapstar, - task_batches), - result._set_length - )) + self._guarded_task_generation_lazy(result._job, + mapstar, + task_batches, + result._lazy_task_gen_helper), + result._set_length, + ) + ) return (item for chunk in result for item in chunk) - def imap_unordered(self, func, iterable, chunksize=1): + def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): ''' Like `imap()` method but ordering of results is arbitrary. ''' self._check_running() if chunksize == 1: - result = IMapUnorderedIterator(self) + result = IMapUnorderedIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation(result._job, func, iterable), - result._set_length - )) + self._guarded_task_generation_lazy(result._job, + func, + iterable, + result._lazy_task_gen_helper), + result._set_length, + ) + ) return result else: if chunksize < 1: raise ValueError( "Chunksize must be 1+, not {0!r}".format(chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapUnorderedIterator(self) + result = IMapUnorderedIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation(result._job, - mapstar, - task_batches), - result._set_length - )) + self._guarded_task_generation_lazy(result._job, + mapstar, + task_batches, + result._lazy_task_gen_helper), + result._set_length, + ) + ) return (item for chunk in result for item in chunk) def apply_async(self, func, args=(), kwds={}, callback=None, @@ -835,8 +879,7 @@ def _set(self, i, success_result): # class IMapIterator(object): - - def __init__(self, pool): + def __init__(self, pool, buffersize): self._pool = pool self._cond = threading.Condition(threading.Lock()) self._job = next(job_counter) @@ -846,6 +889,7 @@ def __init__(self, pool): self._length = None self._unsorted = {} self._cache[self._job] = self + self._lazy_task_gen_helper = _LazyTaskGenHelper(buffersize, self._cond) def __iter__(self): return self @@ -866,6 +910,7 @@ def next(self, timeout=None): self._pool = None raise StopIteration from None raise TimeoutError from None + self._lazy_task_gen_helper.tasks_finished += 1 success, value = item if success: @@ -914,6 +959,22 @@ def _set(self, i, obj): del self._cache[self._job] self._pool = None +# +# Class to store stats for lazy task generation and share them +# between the main thread and `_guarded_task_generation()` thread. +# +class _LazyTaskGenHelper(object): + def __init__(self, buffersize, iterator_cond): + self.feature_enabled = buffersize is not None + self.buffersize = buffersize + self.tasks_generated = 0 + self.tasks_finished = 0 + self.iterator_cond = iterator_cond + + @property + def not_finished_tasks(self): + return self.tasks_generated - self.tasks_finished + # # # From 002ef46d842dbbfea7098577cd60f6f35038cba7 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Sun, 20 Jul 2025 14:41:16 +0200 Subject: [PATCH 02/21] Use semaphore to synchronize threads Using `threading.Semaphore` makes it easier to cap the number of concurrently ran tasks. It also makes it possible to remove busy wait in child thread by waiting for semaphore. Also I've updated code to use the backpressure pattern - the new tasks are scheduled as soon as the user consumes the old ones. --- Lib/multiprocessing/pool.py | 112 ++++++++++++++++-------------------- 1 file changed, 49 insertions(+), 63 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index a0b50a53745368..abdd512980c849 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -14,6 +14,7 @@ # import collections +import functools import itertools import os import queue @@ -395,32 +396,20 @@ def _guarded_task_generation(self, result_job, func, iterable): yield (result_job, i+1, _helper_reraises_exception, (e,), {}) def _guarded_task_generation_lazy(self, result_job, func, iterable, - lazy_task_gen_helper): - '''Provides a generator of tasks for imap and imap_unordered with + backpressure_sema): + """Provides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during - iteration.''' - if not lazy_task_gen_helper.feature_enabled: - yield from self._guarded_task_generation(result_job, func, iterable) - return - + iteration.""" try: i = -1 enumerated_iter = iter(enumerate(iterable)) - thread = threading.current_thread() - max_generated_tasks = self._processes + lazy_task_gen_helper.buffersize - - while thread._state == RUN: - with lazy_task_gen_helper.iterator_cond: - if lazy_task_gen_helper.not_finished_tasks >= max_generated_tasks: - continue # wait for some task to be (picked up and) finished - + while True: + backpressure_sema.acquire() try: - i, x = enumerated_iter.__next__() + i, x = next(enumerated_iter) except StopIteration: break - yield (result_job, i, func, (x,), {}) - lazy_task_gen_helper.tasks_generated += 1 except Exception as e: yield (result_job, i+1, _helper_reraises_exception, (e,), {}) @@ -430,31 +419,32 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' self._check_running() + if chunksize < 1: + raise ValueError("Chunksize must be 1+, not {0:n}".format(chunksize)) + + result = IMapIterator(self, buffersize) + + if result._backpressure_sema is None: + task_generation = self._guarded_task_generation + else: + task_generation = functools.partial( + self._guarded_task_generation_lazy, + backpressure_sema=result._backpressure_sema, + ) + if chunksize == 1: - result = IMapIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation_lazy(result._job, - func, - iterable, - result._lazy_task_gen_helper), + task_generation(result._job, func, iterable), result._set_length, ) ) return result else: - if chunksize < 1: - raise ValueError( - "Chunksize must be 1+, not {0:n}".format( - chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation_lazy(result._job, - mapstar, - task_batches, - result._lazy_task_gen_helper), + task_generation(result._job, mapstar, task_batches), result._set_length, ) ) @@ -465,30 +455,34 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): Like `imap()` method but ordering of results is arbitrary. ''' self._check_running() + if chunksize < 1: + raise ValueError( + "Chunksize must be 1+, not {0!r}".format(chunksize) + ) + + result = IMapUnorderedIterator(self, buffersize) + + if result._backpressure_sema is None: + task_generation = self._guarded_task_generation + else: + task_generation = functools.partial( + self._guarded_task_generation_lazy, + backpressure_sema=result._backpressure_sema, + ) + if chunksize == 1: - result = IMapUnorderedIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation_lazy(result._job, - func, - iterable, - result._lazy_task_gen_helper), + task_generation(result._job, func, iterable), result._set_length, ) ) return result else: - if chunksize < 1: - raise ValueError( - "Chunksize must be 1+, not {0!r}".format(chunksize)) task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapUnorderedIterator(self, buffersize) self._taskqueue.put( ( - self._guarded_task_generation_lazy(result._job, - mapstar, - task_batches, - result._lazy_task_gen_helper), + task_generation(result._job, mapstar, task_batches), result._set_length, ) ) @@ -889,7 +883,13 @@ def __init__(self, pool, buffersize): self._length = None self._unsorted = {} self._cache[self._job] = self - self._lazy_task_gen_helper = _LazyTaskGenHelper(buffersize, self._cond) + + if buffersize is None: + self._backpressure_sema = None + else: + self._backpressure_sema = threading.Semaphore( + value=self._pool._processes + buffersize + ) def __iter__(self): return self @@ -910,7 +910,9 @@ def next(self, timeout=None): self._pool = None raise StopIteration from None raise TimeoutError from None - self._lazy_task_gen_helper.tasks_finished += 1 + + if self._backpressure_sema: + self._backpressure_sema.release() success, value = item if success: @@ -959,22 +961,6 @@ def _set(self, i, obj): del self._cache[self._job] self._pool = None -# -# Class to store stats for lazy task generation and share them -# between the main thread and `_guarded_task_generation()` thread. -# -class _LazyTaskGenHelper(object): - def __init__(self, buffersize, iterator_cond): - self.feature_enabled = buffersize is not None - self.buffersize = buffersize - self.tasks_generated = 0 - self.tasks_finished = 0 - self.iterator_cond = iterator_cond - - @property - def not_finished_tasks(self): - return self.tasks_generated - self.tasks_finished - # # # From 6e0bc58796fd7e5a82a3a3394489852d73402920 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Mon, 21 Jul 2025 23:39:42 +0200 Subject: [PATCH 03/21] Update buffersize behavior to match concurrent.futures.Executor behavior This new behavior allow smaller real concurrency number than number of running processes. Previously, it was not allowed since we implicitly incremented buffersize by `self._processes`. --- Lib/multiprocessing/pool.py | 18 +++++++++++++----- 1 file changed, 13 insertions(+), 5 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index abdd512980c849..9aaaaf55dab594 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -27,7 +27,7 @@ # If threading is available then ThreadPool should be provided. Therefore # we avoid top-level imports which are liable to fail on some systems. from . import util -from . import get_context, TimeoutError +from . import TimeoutError, get_context from .connection import wait # @@ -421,6 +421,11 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): self._check_running() if chunksize < 1: raise ValueError("Chunksize must be 1+, not {0:n}".format(chunksize)) + if buffersize is not None: + if not isinstance(buffersize, int): + raise TypeError("buffersize must be an integer or None") + if buffersize < 1: + raise ValueError("buffersize must be None or > 0") result = IMapIterator(self, buffersize) @@ -459,6 +464,11 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): raise ValueError( "Chunksize must be 1+, not {0!r}".format(chunksize) ) + if buffersize is not None: + if not isinstance(buffersize, int): + raise TypeError("buffersize must be an integer or None") + if buffersize < 1: + raise ValueError("buffersize must be None or > 0") result = IMapUnorderedIterator(self, buffersize) @@ -887,9 +897,7 @@ def __init__(self, pool, buffersize): if buffersize is None: self._backpressure_sema = None else: - self._backpressure_sema = threading.Semaphore( - value=self._pool._processes + buffersize - ) + self._backpressure_sema = threading.Semaphore(buffersize) def __iter__(self): return self @@ -911,7 +919,7 @@ def next(self, timeout=None): raise StopIteration from None raise TimeoutError from None - if self._backpressure_sema: + if self._backpressure_sema is not None: self._backpressure_sema.release() success, value = item From 62b2b6a9b50f3bdfb4a8930956ffa7c9afe2c29c Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 01:10:05 +0200 Subject: [PATCH 04/21] Release all `buffersize_lock` obj from the parent thread when terminate --- Lib/multiprocessing/pool.py | 104 ++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 53 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 9aaaaf55dab594..b8caac82e00b0c 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -191,6 +191,11 @@ def __init__(self, processes=None, initializer=None, initargs=(), self._ctx = context or get_context() self._setup_queues() self._taskqueue = queue.SimpleQueue() + # The _taskqueue_buffersize_semaphores exist to allow calling .release() + # on every active semaphore when the pool is terminating to let task_handler + # wake up to stop. It's a dict so that each iterator object can efficiently + # deregister its semaphore when iterator finishes. + self._taskqueue_buffersize_semaphores = {} # The _change_notifier queue exist to wake up self._handle_workers() # when the cache (self._cache) is empty or when there is a change in # the _state variable of the thread that runs _handle_workers. @@ -257,7 +262,8 @@ def __init__(self, processes=None, initializer=None, initargs=(), self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, self._change_notifier, self._worker_handler, self._task_handler, - self._result_handler, self._cache), + self._result_handler, self._cache, + self._taskqueue_buffersize_semaphores), exitpriority=15 ) self._state = RUN @@ -383,33 +389,27 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None, return self._map_async(func, iterable, starmapstar, chunksize, callback, error_callback) - def _guarded_task_generation(self, result_job, func, iterable): + def _guarded_task_generation(self, result_job, func, iterable, + buffersize_sema=None): '''Provides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.''' try: i = -1 - for i, x in enumerate(iterable): - yield (result_job, i, func, (x,), {}) - except Exception as e: - yield (result_job, i+1, _helper_reraises_exception, (e,), {}) + if buffersize_sema is None: + for i, x in enumerate(iterable): + yield (result_job, i, func, (x,), {}) - def _guarded_task_generation_lazy(self, result_job, func, iterable, - backpressure_sema): - """Provides a generator of tasks for imap and imap_unordered with - appropriate handling for iterables which throw exceptions during - iteration.""" - try: - i = -1 - enumerated_iter = iter(enumerate(iterable)) - while True: - backpressure_sema.acquire() - try: - i, x = next(enumerated_iter) - except StopIteration: - break - yield (result_job, i, func, (x,), {}) + else: + enumerated_iter = iter(enumerate(iterable)) + while True: + buffersize_sema.acquire() + try: + i, x = next(enumerated_iter) + except StopIteration: + break + yield (result_job, i, func, (x,), {}) except Exception as e: yield (result_job, i+1, _helper_reraises_exception, (e,), {}) @@ -428,19 +428,11 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): raise ValueError("buffersize must be None or > 0") result = IMapIterator(self, buffersize) - - if result._backpressure_sema is None: - task_generation = self._guarded_task_generation - else: - task_generation = functools.partial( - self._guarded_task_generation_lazy, - backpressure_sema=result._backpressure_sema, - ) - if chunksize == 1: self._taskqueue.put( ( - task_generation(result._job, func, iterable), + self._guarded_task_generation(result._job, func, iterable, + result._buffersize_sema), result._set_length, ) ) @@ -449,7 +441,8 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): task_batches = Pool._get_tasks(func, iterable, chunksize) self._taskqueue.put( ( - task_generation(result._job, mapstar, task_batches), + self._guarded_task_generation(result._job, mapstar, task_batches, + result._buffersize_sema), result._set_length, ) ) @@ -471,19 +464,11 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): raise ValueError("buffersize must be None or > 0") result = IMapUnorderedIterator(self, buffersize) - - if result._backpressure_sema is None: - task_generation = self._guarded_task_generation - else: - task_generation = functools.partial( - self._guarded_task_generation_lazy, - backpressure_sema=result._backpressure_sema, - ) - if chunksize == 1: self._taskqueue.put( ( - task_generation(result._job, func, iterable), + self._guarded_task_generation(result._job, func, iterable, + result._buffersize_sema), result._set_length, ) ) @@ -492,7 +477,8 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): task_batches = Pool._get_tasks(func, iterable, chunksize) self._taskqueue.put( ( - task_generation(result._job, mapstar, task_batches), + self._guarded_task_generation(result._job, mapstar, task_batches, + result._buffersize_sema), result._set_length, ) ) @@ -727,7 +713,8 @@ def _help_stuff_finish(inqueue, task_handler, size): @classmethod def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, - worker_handler, task_handler, result_handler, cache): + worker_handler, task_handler, result_handler, cache, + taskqueue_buffersize_semaphores): # this is guaranteed to only be called once util.debug('finalizing pool') @@ -738,6 +725,10 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, change_notifier.put(None) task_handler._state = TERMINATE + # Release all semaphores to wake up task_handler to stop. + for job_id, sema in tuple(taskqueue_buffersize_semaphores.items()): + taskqueue_buffersize_semaphores.pop(job_id) + sema.release() util.debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) @@ -893,11 +884,13 @@ def __init__(self, pool, buffersize): self._length = None self._unsorted = {} self._cache[self._job] = self - if buffersize is None: - self._backpressure_sema = None + self._buffersize_sema = None else: - self._backpressure_sema = threading.Semaphore(buffersize) + self._buffersize_sema = threading.Semaphore(buffersize) + self._pool._taskqueue_buffersize_semaphores[self] = ( + self._buffersize_sema + ) def __iter__(self): return self @@ -908,25 +901,30 @@ def next(self, timeout=None): item = self._items.popleft() except IndexError: if self._index == self._length: - self._pool = None - raise StopIteration from None + self._stop_iterator() self._cond.wait(timeout) try: item = self._items.popleft() except IndexError: if self._index == self._length: - self._pool = None - raise StopIteration from None + self._stop_iterator() raise TimeoutError from None - if self._backpressure_sema is not None: - self._backpressure_sema.release() + if self._buffersize_sema is not None: + self._buffersize_sema.release() success, value = item if success: return value raise value + def _stop_iterator(self): + if self._pool is not None: + # could be deleted in previous `.next()` calls + self._pool._taskqueue_buffersize_semaphores.pop(self._job) + self._pool = None + raise StopIteration from None + __next__ = next # XXX def _set(self, i, obj): From 0b6ba419c226eea338ef56cc1e2d1f91199c883b Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 01:34:03 +0200 Subject: [PATCH 05/21] Add 2 basic `ThreadPool.imap()` tests w/ and w/o buffersize --- Lib/test/_test_multiprocessing.py | 58 +++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index a1259ff1d63d18..414c90725c4391 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2929,6 +2929,64 @@ def test_imap(self): self.assertEqual(next(it), i*i) self.assertRaises(StopIteration, it.__next__) + def test_imap_inf_iterable_with_slow_task(self): + if self.TYPE in ("processes", "manager"): + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + processes = 4 + p = self.Pool(processes) + + tasks_started_later = 2 + last_produced_task_arg = Value("i") + + def produce_args(): + for arg in range(1, processes + tasks_started_later + 1): + last_produced_task_arg.value = arg + yield arg + + it = p.imap(functools.partial(sqr, wait=0.2), produce_args()) + + next(it) + time.sleep(0.2) + # `iterable` should've been advanced only up by `processes` times, + # but in fact advances further (by `>=processes+1`). + # In this case, it advances to the maximum value. + self.assertGreater(last_produced_task_arg.value, processes + 1) + + p.terminate() + p.join() + + def test_imap_inf_iterable_with_slow_task_and_buffersize(self): + if self.TYPE in ("processes", "manager"): + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + processes = 4 + p = self.Pool(processes) + + tasks_started_later = 2 + last_produced_task_arg = Value("i") + + def produce_args(): + for arg in range(1, processes + tasks_started_later + 1): + last_produced_task_arg.value = arg + yield arg + + it = p.imap( + functools.partial(sqr, wait=0.2), + produce_args(), + buffersize=processes, + ) + + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes) + + next(it) + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes + 1) + + p.terminate() + p.join() + def test_imap_handle_iterable_exception(self): if self.TYPE == 'manager': self.skipTest('test not appropriate for {}'.format(self.TYPE)) From aade15e629ccbeae2d12818b0cf8cfc4e41b39bd Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 01:37:56 +0200 Subject: [PATCH 06/21] Fix accidental swap in imports --- Lib/multiprocessing/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index b8caac82e00b0c..836846d00c3ebc 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -27,7 +27,7 @@ # If threading is available then ThreadPool should be provided. Therefore # we avoid top-level imports which are liable to fail on some systems. from . import util -from . import TimeoutError, get_context +from . import get_context, TimeoutError from .connection import wait # From fb38a72056a2b1c1de4ee55e8063c85cadf8fed8 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 01:41:48 +0200 Subject: [PATCH 07/21] clear Pool._taskqueue_buffersize_semaphores safely --- Lib/multiprocessing/pool.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 836846d00c3ebc..79148f835b4b2c 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -727,7 +727,7 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, task_handler._state = TERMINATE # Release all semaphores to wake up task_handler to stop. for job_id, sema in tuple(taskqueue_buffersize_semaphores.items()): - taskqueue_buffersize_semaphores.pop(job_id) + taskqueue_buffersize_semaphores.pop(job_id, None) sema.release() util.debug('helping task handler/workers to finish') @@ -920,8 +920,8 @@ def next(self, timeout=None): def _stop_iterator(self): if self._pool is not None: - # could be deleted in previous `.next()` calls - self._pool._taskqueue_buffersize_semaphores.pop(self._job) + # `self._pool` could be set to `None` in previous `.next()` calls + self._pool._taskqueue_buffersize_semaphores.pop(self._job, None) self._pool = None raise StopIteration from None From 6ef488b51446f5dd253962cb3f6ace0968649db9 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 01:51:23 +0200 Subject: [PATCH 08/21] Slightly optimize Pool._taskqueue_buffersize_semaphores terminate --- Lib/multiprocessing/pool.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 79148f835b4b2c..36951cfae9ed31 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -726,9 +726,10 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, task_handler._state = TERMINATE # Release all semaphores to wake up task_handler to stop. - for job_id, sema in tuple(taskqueue_buffersize_semaphores.items()): - taskqueue_buffersize_semaphores.pop(job_id, None) - sema.release() + for job_id in tuple(taskqueue_buffersize_semaphores.keys()): + sema = taskqueue_buffersize_semaphores.pop(job_id, None) + if sema is not None: + sema.release() util.debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) From 1716725d4af6a30a648ef27c02b5d523718f7866 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 01:55:01 +0200 Subject: [PATCH 09/21] Rename `Pool.imap()` buffersize-related tests --- Lib/test/_test_multiprocessing.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 414c90725c4391..8eebe100fc3adf 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2929,7 +2929,7 @@ def test_imap(self): self.assertEqual(next(it), i*i) self.assertRaises(StopIteration, it.__next__) - def test_imap_inf_iterable_with_slow_task(self): + def test_imap_fast_iterable_with_slow_task(self): if self.TYPE in ("processes", "manager"): self.skipTest("test not appropriate for {}".format(self.TYPE)) @@ -2956,7 +2956,7 @@ def produce_args(): p.terminate() p.join() - def test_imap_inf_iterable_with_slow_task_and_buffersize(self): + def test_imap_fast_iterable_with_slow_task_and_buffersize(self): if self.TYPE in ("processes", "manager"): self.skipTest("test not appropriate for {}".format(self.TYPE)) From 9b43cd086b38d3d39c9446ad8c0a07524c16d037 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 08:05:41 +0200 Subject: [PATCH 10/21] Fix typo in `IMapIterator.__init__()` --- Lib/multiprocessing/pool.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index 36951cfae9ed31..caaf9497b1f169 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -889,7 +889,7 @@ def __init__(self, pool, buffersize): self._buffersize_sema = None else: self._buffersize_sema = threading.Semaphore(buffersize) - self._pool._taskqueue_buffersize_semaphores[self] = ( + self._pool._taskqueue_buffersize_semaphores[self._job] = ( self._buffersize_sema ) From 2d8934123e563379dc3b1ab578e62beae38c089f Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 22 Jul 2025 09:06:09 +0200 Subject: [PATCH 11/21] Add tests for buffersize combinations with other kwargs --- Lib/test/_test_multiprocessing.py | 79 ++++++++++++++++++++++++------- 1 file changed, 62 insertions(+), 17 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 8eebe100fc3adf..0c67f625643b1b 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2916,21 +2916,45 @@ def test_async_timeout(self): p.join() def test_imap(self): - it = self.pool.imap(sqr, list(range(10))) - self.assertEqual(list(it), list(map(sqr, list(range(10))))) - - it = self.pool.imap(sqr, list(range(10))) - for i in range(10): - self.assertEqual(next(it), i*i) - self.assertRaises(StopIteration, it.__next__) + optimal_buffersize = 4 # `self.pool` size + buffersize_variants = [ + {"buffersize": None}, + {"buffersize": 1}, + {"buffersize": optimal_buffersize}, + {"buffersize": optimal_buffersize * 2}, + ] - it = self.pool.imap(sqr, list(range(1000)), chunksize=100) - for i in range(1000): - self.assertEqual(next(it), i*i) - self.assertRaises(StopIteration, it.__next__) + for kwargs in ({}, *buffersize_variants): + with self.subTest(**kwargs): + iterable = range(10) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap(sqr, iterable, **kwargs) + self.assertEqual(list(it), list(map(sqr, list(range(10))))) + + iterable = range(10) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap(sqr, iterable, **kwargs) + for i in range(10): + self.assertEqual(next(it), i * i) + self.assertRaises(StopIteration, it.__next__) + + for kwargs in ( + {"chunksize": 100}, + {"chunksize": 100, "buffersize": optimal_buffersize}, + ): + with self.subTest(**kwargs): + iterable = range(1000) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap(sqr, iterable, **kwargs) + for i in range(1000): + self.assertEqual(next(it), i * i) + self.assertRaises(StopIteration, it.__next__) def test_imap_fast_iterable_with_slow_task(self): - if self.TYPE in ("processes", "manager"): + if self.TYPE != "threads": self.skipTest("test not appropriate for {}".format(self.TYPE)) processes = 4 @@ -2957,7 +2981,7 @@ def produce_args(): p.join() def test_imap_fast_iterable_with_slow_task_and_buffersize(self): - if self.TYPE in ("processes", "manager"): + if self.TYPE != "threads": self.skipTest("test not appropriate for {}".format(self.TYPE)) processes = 4 @@ -3014,11 +3038,32 @@ def test_imap_handle_iterable_exception(self): self.assertRaises(SayWhenError, it.__next__) def test_imap_unordered(self): - it = self.pool.imap_unordered(sqr, list(range(10))) - self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) + optimal_buffersize = 4 # `self.pool` size + buffersize_variants = [ + {"buffersize": None}, + {"buffersize": 1}, + {"buffersize": optimal_buffersize}, + {"buffersize": optimal_buffersize * 2}, + ] - it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) - self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) + for kwargs in ({}, *buffersize_variants): + with self.subTest(**kwargs): + iterable = range(10) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap_unordered(sqr, iterable, **kwargs) + self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) + + for kwargs in ( + {"chunksize": 100}, + {"chunksize": 100, "buffersize": optimal_buffersize}, + ): + with self.subTest(**kwargs): + iterable = range(1000) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap_unordered(sqr, iterable, **kwargs) + self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) def test_imap_unordered_handle_iterable_exception(self): if self.TYPE == 'manager': From 9ab27055e3e5d71f8cd967baf0906bc1ecd5eab8 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Sun, 27 Jul 2025 18:04:15 +0200 Subject: [PATCH 12/21] Remove if-branch in `_terminate_pool` --- Lib/multiprocessing/pool.py | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index caaf9497b1f169..e7f40207156d5e 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -726,10 +726,11 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, task_handler._state = TERMINATE # Release all semaphores to wake up task_handler to stop. - for job_id in tuple(taskqueue_buffersize_semaphores.keys()): - sema = taskqueue_buffersize_semaphores.pop(job_id, None) - if sema is not None: - sema.release() + for job_id, buffersize_sema in tuple( + taskqueue_buffersize_semaphores.items() + ): + buffersize_sema.release() + taskqueue_buffersize_semaphores.pop(job_id, None) util.debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) From a95500340fe193e84a31c13168763737dbb7d4f7 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Sun, 27 Jul 2025 18:11:03 +0200 Subject: [PATCH 13/21] Add more edge-case tests for `imap` and `imap_unodered` These tests mostly come from a similar PR adding `buffersize` param to `concurrent.futures.Executor.map` - https://github.com/python/cpython/pull/125663/files --- Lib/test/_test_multiprocessing.py | 180 ++++++++++++++++++++---------- 1 file changed, 122 insertions(+), 58 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 0c67f625643b1b..17531a00479164 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2953,64 +2953,6 @@ def test_imap(self): self.assertEqual(next(it), i * i) self.assertRaises(StopIteration, it.__next__) - def test_imap_fast_iterable_with_slow_task(self): - if self.TYPE != "threads": - self.skipTest("test not appropriate for {}".format(self.TYPE)) - - processes = 4 - p = self.Pool(processes) - - tasks_started_later = 2 - last_produced_task_arg = Value("i") - - def produce_args(): - for arg in range(1, processes + tasks_started_later + 1): - last_produced_task_arg.value = arg - yield arg - - it = p.imap(functools.partial(sqr, wait=0.2), produce_args()) - - next(it) - time.sleep(0.2) - # `iterable` should've been advanced only up by `processes` times, - # but in fact advances further (by `>=processes+1`). - # In this case, it advances to the maximum value. - self.assertGreater(last_produced_task_arg.value, processes + 1) - - p.terminate() - p.join() - - def test_imap_fast_iterable_with_slow_task_and_buffersize(self): - if self.TYPE != "threads": - self.skipTest("test not appropriate for {}".format(self.TYPE)) - - processes = 4 - p = self.Pool(processes) - - tasks_started_later = 2 - last_produced_task_arg = Value("i") - - def produce_args(): - for arg in range(1, processes + tasks_started_later + 1): - last_produced_task_arg.value = arg - yield arg - - it = p.imap( - functools.partial(sqr, wait=0.2), - produce_args(), - buffersize=processes, - ) - - time.sleep(0.2) - self.assertEqual(last_produced_task_arg.value, processes) - - next(it) - time.sleep(0.2) - self.assertEqual(last_produced_task_arg.value, processes + 1) - - p.terminate() - p.join() - def test_imap_handle_iterable_exception(self): if self.TYPE == 'manager': self.skipTest('test not appropriate for {}'.format(self.TYPE)) @@ -3101,6 +3043,128 @@ def test_imap_unordered_handle_iterable_exception(self): self.assertIn(value, expected_values) expected_values.remove(value) + def test_imap_and_imap_unordered_buffersize_type_validation(self): + for method_name in ("imap", "imap_unordered"): + for buffersize in ("foo", 2.0): + with ( + self.subTest(method=method_name, buffersize=buffersize), + self.assertRaisesRegex( + TypeError, "buffersize must be an integer or None" + ), + ): + method = getattr(self.pool, method_name) + method(str, range(4), buffersize=buffersize) + + def test_imap_and_imap_unordered_buffersize_value_validation(self): + for method_name in ("imap", "imap_unordered"): + for buffersize in (0, -1): + with ( + self.subTest(method=method_name, buffersize=buffersize), + self.assertRaisesRegex( + ValueError, "buffersize must be None or > 0" + ), + ): + method = getattr(self.pool, method_name) + method(str, range(4), buffersize=buffersize) + + def test_imap_and_imap_unordered_when_buffer_is_full(self): + if self.TYPE != "threads": + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + processes = 4 + p = self.Pool(processes) + last_produced_task_arg = Value("i") + + def produce_args(): + for arg in itertools.count(1): + last_produced_task_arg.value = arg + yield arg + + method = getattr(p, method_name) + it = method(functools.partial(sqr, wait=0.2), produce_args()) + + time.sleep(0.2) + # `iterable` could've been advanced only `processes` times, + # but in fact it advances further (`> processes`) because of + # not waiting for workers or user code to catch up. + self.assertGreater(last_produced_task_arg.value, processes) + + next(it) + time.sleep(0.2) + self.assertGreater(last_produced_task_arg.value, processes + 1) + + next(it) + time.sleep(0.2) + self.assertGreater(last_produced_task_arg.value, processes + 2) + + p.terminate() + p.join() + + def test_imap_and_imap_unordered_buffersize_when_buffer_is_full(self): + if self.TYPE != "threads": + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + processes = 4 + p = self.Pool(processes) + last_produced_task_arg = Value("i") + + def produce_args(): + for arg in itertools.count(1): + last_produced_task_arg.value = arg + yield arg + + method = getattr(p, method_name) + it = method( + functools.partial(sqr, wait=0.2), + produce_args(), + buffersize=processes, + ) + + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes) + + next(it) + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes + 1) + + next(it) + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes + 2) + + p.terminate() + p.join() + + def test_imap_and_imap_unordered_buffersize_on_infinite_iterable(self): + if self.TYPE != "threads": + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + p = self.Pool(4) + method = getattr(p, method_name) + + res = method(str, itertools.count(), buffersize=2) + + self.assertEqual(next(res, None), "0") + self.assertEqual(next(res, None), "1") + self.assertEqual(next(res, None), "2") + + p.terminate() + p.join() + + def test_imap_and_imap_unordered_buffersize_on_empty_iterable(self): + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + method = getattr(self.pool, method_name) + + res = method(str, [], buffersize=2) + + self.assertIsNone(next(res, None)) + def test_make_pool(self): expected_error = (RemoteError if self.TYPE == 'manager' else ValueError) From 80efd6e56ed45aed25211b15fca0ff591c32ac3c Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Sun, 27 Jul 2025 23:31:10 +0200 Subject: [PATCH 14/21] Split inf iterable test for `imap` and `imap_unordered` --- Lib/test/_test_multiprocessing.py | 43 +++++++++++++++++++------------ 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index 17531a00479164..d85582fb528c4b 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -3138,32 +3138,41 @@ def produce_args(): p.terminate() p.join() - def test_imap_and_imap_unordered_buffersize_on_infinite_iterable(self): + def test_imap_and_imap_unordered_buffersize_on_empty_iterable(self): + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + method = getattr(self.pool, method_name) + + res = method(str, [], buffersize=2) + + self.assertIsNone(next(res, None)) + + def test_imap_buffersize_on_infinite_iterable(self): if self.TYPE != "threads": self.skipTest("test not appropriate for {}".format(self.TYPE)) - for method_name in ("imap", "imap_unordered"): - with self.subTest(method=method_name): - p = self.Pool(4) - method = getattr(p, method_name) + p = self.Pool(4) + res = p.imap(str, itertools.count(), buffersize=2) - res = method(str, itertools.count(), buffersize=2) + self.assertEqual(next(res, None), "0") + self.assertEqual(next(res, None), "1") + self.assertEqual(next(res, None), "2") - self.assertEqual(next(res, None), "0") - self.assertEqual(next(res, None), "1") - self.assertEqual(next(res, None), "2") + p.terminate() + p.join() - p.terminate() - p.join() + def test_imap_unordered_buffersize_on_infinite_iterable(self): + if self.TYPE != "threads": + self.skipTest("test not appropriate for {}".format(self.TYPE)) - def test_imap_and_imap_unordered_buffersize_on_empty_iterable(self): - for method_name in ("imap", "imap_unordered"): - with self.subTest(method=method_name): - method = getattr(self.pool, method_name) + p = self.Pool(4) + res = p.imap(str, itertools.count(), buffersize=2) + first_three_results = sorted(next(res, None) for _ in range(3)) - res = method(str, [], buffersize=2) + self.assertEqual(first_three_results, ["0", "1", "2"]) - self.assertIsNone(next(res, None)) + p.terminate() + p.join() def test_make_pool(self): expected_error = (RemoteError if self.TYPE == 'manager' From 83d69306d1b6d0828c783e8072f4d4baf1ab4cd1 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Mon, 28 Jul 2025 00:19:46 +0200 Subject: [PATCH 15/21] Add doc for `buffersize` argument of `imap` and `imap_unordered` --- Doc/library/multiprocessing.rst | 15 +++++++++++++-- .../2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst | 8 ++++++++ 2 files changed, 21 insertions(+), 2 deletions(-) create mode 100644 Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index d18ada3511d891..35c8c25d51fe4a 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -2448,7 +2448,7 @@ with the :class:`Pool` class. Callbacks should complete immediately since otherwise the thread which handles the results will get blocked. - .. method:: imap(func, iterable[, chunksize]) + .. method:: imap(func, iterable[, chunksize[, buffersize]]) A lazier version of :meth:`.map`. @@ -2462,7 +2462,18 @@ with the :class:`Pool` class. ``next(timeout)`` will raise :exc:`multiprocessing.TimeoutError` if the result cannot be returned within *timeout* seconds. - .. method:: imap_unordered(func, iterable[, chunksize]) + The *iterable* is collected immediately rather than lazily, unless a + *buffersize* is specified to limit the number of submitted tasks whose + results have not yet been yielded. If the buffer is full, iteration over + the *iterables* pauses until a result is yielded from the buffer. + To fully utilize pool's capacity, set *buffersize* to the number of + processes in pool (to consume *iterable* as you go) or even higher + (to prefetch *buffersize - processes* arguments). + + .. versionadded:: 3.15 + Added the *buffersize* parameter. + + .. method:: imap_unordered(func, iterable[, chunksize[, buffersize]]) The same as :meth:`imap` except that the ordering of the results from the returned iterator should be considered arbitrary. (Only when there is diff --git a/Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst b/Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst new file mode 100644 index 00000000000000..f9716bd0326e17 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst @@ -0,0 +1,8 @@ +Add the optional ``buffersize`` parameter to +:meth:`multiprocessing.pool.Pool.imap` and +:meth:`multiprocessing.pool.Pool.imap_unordered` to limit the number of +submitted tasks whose results have not yet been yielded. If the buffer is +full, iteration over the *iterables* pauses until a result is yielded from +the buffer. To fully utilize pool's capacity, set *buffersize* to the number +of processes in pool (to consume *iterable* as you go) or even higher (to +prefetch *buffersize - processes* arguments). From 995ad8c298677e81bdb586ebefa7360fbdfe223e Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Mon, 28 Jul 2025 10:55:08 +0200 Subject: [PATCH 16/21] add *versionadded* for `imap_unordered` --- Doc/library/multiprocessing.rst | 3 +++ 1 file changed, 3 insertions(+) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 35c8c25d51fe4a..288f9606bf1319 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -2479,6 +2479,9 @@ with the :class:`Pool` class. returned iterator should be considered arbitrary. (Only when there is only one worker process is the order guaranteed to be "correct".) + .. versionadded:: 3.15 + Added the *buffersize* parameter. + .. method:: starmap(func, iterable[, chunksize]) Like :meth:`~multiprocessing.pool.Pool.map` except that the From 3b6ad65b71a93da3a770f6a4c79921aa14457a5c Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Mon, 28 Jul 2025 11:01:55 +0200 Subject: [PATCH 17/21] Remove ambiguity in `buffersize` description. Previously it was a bit ambigious to say "pass this to fully utilize pool's capacity" as the fastest way would still be not passing `buffersize` at all. Now this section clearly says "... when using this feature". --- Doc/library/multiprocessing.rst | 7 ++++--- ...5.rst => 2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst} | 7 ++++--- 2 files changed, 8 insertions(+), 6 deletions(-) rename Misc/NEWS.d/next/Library/{2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst => 2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst} (56%) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index 288f9606bf1319..def4aafd110c62 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -2466,9 +2466,10 @@ with the :class:`Pool` class. *buffersize* is specified to limit the number of submitted tasks whose results have not yet been yielded. If the buffer is full, iteration over the *iterables* pauses until a result is yielded from the buffer. - To fully utilize pool's capacity, set *buffersize* to the number of - processes in pool (to consume *iterable* as you go) or even higher - (to prefetch *buffersize - processes* arguments). + To fully utilize pool's capacity when using this feature, + set *buffersize* at least to the number of processes in pool + (to consume *iterable* as you go), or even higher + (to prefetch the next ``N=buffersize-processes`` arguments). .. versionadded:: 3.15 Added the *buffersize* parameter. diff --git a/Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst b/Misc/NEWS.d/next/Library/2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst similarity index 56% rename from Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst rename to Misc/NEWS.d/next/Library/2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst index f9716bd0326e17..ca40bc111176e0 100644 --- a/Misc/NEWS.d/next/Library/2025-07-28-00-37-22.gh-issue-64192.iCGeQ5.rst +++ b/Misc/NEWS.d/next/Library/2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst @@ -3,6 +3,7 @@ Add the optional ``buffersize`` parameter to :meth:`multiprocessing.pool.Pool.imap_unordered` to limit the number of submitted tasks whose results have not yet been yielded. If the buffer is full, iteration over the *iterables* pauses until a result is yielded from -the buffer. To fully utilize pool's capacity, set *buffersize* to the number -of processes in pool (to consume *iterable* as you go) or even higher (to -prefetch *buffersize - processes* arguments). +the buffer. To fully utilize pool's capacity when using this feature, set +*buffersize* at least to the number of processes in pool (to consume +*iterable* as you go), or even higher (to prefetch the next +``N=buffersize-processes`` arguments). From c941c16ddee10614a427e655d71b4974c8468c3d Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Mon, 28 Jul 2025 23:53:29 +0200 Subject: [PATCH 18/21] Set *versionadded* as next in docs --- Doc/library/multiprocessing.rst | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index def4aafd110c62..0ce5800806a166 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -2471,7 +2471,7 @@ with the :class:`Pool` class. (to consume *iterable* as you go), or even higher (to prefetch the next ``N=buffersize-processes`` arguments). - .. versionadded:: 3.15 + .. versionadded:: next Added the *buffersize* parameter. .. method:: imap_unordered(func, iterable[, chunksize[, buffersize]]) @@ -2480,7 +2480,7 @@ with the :class:`Pool` class. returned iterator should be considered arbitrary. (Only when there is only one worker process is the order guaranteed to be "correct".) - .. versionadded:: 3.15 + .. versionadded:: next Added the *buffersize* parameter. .. method:: starmap(func, iterable[, chunksize]) From d09e891d2f887691fc0543c4e437b05e26e943d8 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 29 Jul 2025 00:01:18 +0200 Subject: [PATCH 19/21] Add whatsnew entry --- Doc/whatsnew/3.15.rst | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/Doc/whatsnew/3.15.rst b/Doc/whatsnew/3.15.rst index 9f01b52f1aff3b..177a0fe7eb88df 100644 --- a/Doc/whatsnew/3.15.rst +++ b/Doc/whatsnew/3.15.rst @@ -273,6 +273,22 @@ math (Contributed by Bénédikt Tran in :gh:`135853`.) +multiprocessing +--------------- + +* Add the optional ``buffersize`` parameter to + :meth:`multiprocessing.pool.Pool.imap` and + :meth:`multiprocessing.pool.Pool.imap_unordered` to limit the number of + submitted tasks whose results have not yet been yielded. If the buffer is + full, iteration over the *iterables* pauses until a result is yielded from + the buffer. To fully utilize pool's capacity when using this feature, set + *buffersize* at least to the number of processes in pool (to consume + *iterable* as you go), or even higher (to prefetch the next + ``N=buffersize-processes`` arguments). + + (Contributed by Oleksandr Baltian in :gh:`136871`.) + + os.path ------- From 9c6d89de5f66308882659fe6f614c3f66ad943e6 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 29 Jul 2025 00:34:55 +0200 Subject: [PATCH 20/21] Fix aggreed comments on code formatting/minor refactoring --- Lib/multiprocessing/pool.py | 62 ++++++++++++++++++++++--------------- 1 file changed, 37 insertions(+), 25 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index e7f40207156d5e..fc5e68541b2fa6 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -389,22 +389,21 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None, return self._map_async(func, iterable, starmapstar, chunksize, callback, error_callback) - def _guarded_task_generation(self, result_job, func, iterable, - buffersize_sema=None): + def _guarded_task_generation(self, result_job, func, iterable, sema=None): '''Provides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.''' try: i = -1 - if buffersize_sema is None: + if sema is None: for i, x in enumerate(iterable): yield (result_job, i, func, (x,), {}) else: enumerated_iter = iter(enumerate(iterable)) while True: - buffersize_sema.acquire() + sema.acquire() try: i, x = next(enumerated_iter) except StopIteration: @@ -419,13 +418,8 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' self._check_running() - if chunksize < 1: - raise ValueError("Chunksize must be 1+, not {0:n}".format(chunksize)) - if buffersize is not None: - if not isinstance(buffersize, int): - raise TypeError("buffersize must be an integer or None") - if buffersize < 1: - raise ValueError("buffersize must be None or > 0") + self._check_chunksize(chunksize) + self._check_buffersize(buffersize) result = IMapIterator(self, buffersize) if chunksize == 1: @@ -441,8 +435,12 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): task_batches = Pool._get_tasks(func, iterable, chunksize) self._taskqueue.put( ( - self._guarded_task_generation(result._job, mapstar, task_batches, - result._buffersize_sema), + self._guarded_task_generation( + result._job, + mapstar, + task_batches, + result._buffersize_sema, + ), result._set_length, ) ) @@ -453,15 +451,8 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): Like `imap()` method but ordering of results is arbitrary. ''' self._check_running() - if chunksize < 1: - raise ValueError( - "Chunksize must be 1+, not {0!r}".format(chunksize) - ) - if buffersize is not None: - if not isinstance(buffersize, int): - raise TypeError("buffersize must be an integer or None") - if buffersize < 1: - raise ValueError("buffersize must be None or > 0") + self._check_chunksize(chunksize) + self._check_buffersize(buffersize) result = IMapUnorderedIterator(self, buffersize) if chunksize == 1: @@ -477,8 +468,12 @@ def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): task_batches = Pool._get_tasks(func, iterable, chunksize) self._taskqueue.put( ( - self._guarded_task_generation(result._job, mapstar, task_batches, - result._buffersize_sema), + self._guarded_task_generation( + result._job, + mapstar, + task_batches, + result._buffersize_sema, + ), result._set_length, ) ) @@ -531,6 +526,22 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, ) return result + @staticmethod + def _check_chunksize(chunksize): + if chunksize < 1: + raise ValueError( + "Chunksize must be 1+, not {0:n}".format(chunksize) + ) + + @staticmethod + def _check_buffersize(buffersize): + if buffersize is None: + return + if not isinstance(buffersize, int): + raise TypeError("buffersize must be an integer or None") + if buffersize < 1: + raise ValueError("buffersize must be None or > 0") + @staticmethod def _wait_for_updates(sentinels, change_notifier, timeout=None): wait(sentinels, timeout=timeout) @@ -876,7 +887,8 @@ def _set(self, i, success_result): # class IMapIterator(object): - def __init__(self, pool, buffersize): + + def __init__(self, pool, buffersize=None): self._pool = pool self._cond = threading.Condition(threading.Lock()) self._job = next(job_counter) From 4550a01bec9470991ce241a397ef5cf199aee3a1 Mon Sep 17 00:00:00 2001 From: Oleksandr Baltian Date: Tue, 29 Jul 2025 00:45:01 +0200 Subject: [PATCH 21/21] Remove `imap` and `imap_unordered` body code duplication --- Lib/multiprocessing/pool.py | 87 +++++++++++++------------------------ 1 file changed, 31 insertions(+), 56 deletions(-) diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index fc5e68541b2fa6..5c5930a0d7aa4d 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -417,67 +417,14 @@ def imap(self, func, iterable, chunksize=1, buffersize=None): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' - self._check_running() - self._check_chunksize(chunksize) - self._check_buffersize(buffersize) - - result = IMapIterator(self, buffersize) - if chunksize == 1: - self._taskqueue.put( - ( - self._guarded_task_generation(result._job, func, iterable, - result._buffersize_sema), - result._set_length, - ) - ) - return result - else: - task_batches = Pool._get_tasks(func, iterable, chunksize) - self._taskqueue.put( - ( - self._guarded_task_generation( - result._job, - mapstar, - task_batches, - result._buffersize_sema, - ), - result._set_length, - ) - ) - return (item for chunk in result for item in chunk) + return self._imap(IMapIterator, func, iterable, chunksize, buffersize) def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): ''' Like `imap()` method but ordering of results is arbitrary. ''' - self._check_running() - self._check_chunksize(chunksize) - self._check_buffersize(buffersize) - - result = IMapUnorderedIterator(self, buffersize) - if chunksize == 1: - self._taskqueue.put( - ( - self._guarded_task_generation(result._job, func, iterable, - result._buffersize_sema), - result._set_length, - ) - ) - return result - else: - task_batches = Pool._get_tasks(func, iterable, chunksize) - self._taskqueue.put( - ( - self._guarded_task_generation( - result._job, - mapstar, - task_batches, - result._buffersize_sema, - ), - result._set_length, - ) - ) - return (item for chunk in result for item in chunk) + return self._imap(IMapUnorderedIterator, func, iterable, chunksize, + buffersize) def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None): @@ -526,6 +473,34 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, ) return result + def _imap(self, iterator_cls, func, iterable, chunksize=1, + buffersize=None): + self._check_running() + self._check_chunksize(chunksize) + self._check_buffersize(buffersize) + + result = iterator_cls(self, buffersize) + if chunksize == 1: + self._taskqueue.put( + ( + self._guarded_task_generation(result._job, func, iterable, + result._buffersize_sema), + result._set_length, + ) + ) + return result + else: + task_batches = Pool._get_tasks(func, iterable, chunksize) + self._taskqueue.put( + ( + self._guarded_task_generation(result._job, mapstar, + task_batches, + result._buffersize_sema), + result._set_length, + ) + ) + return (item for chunk in result for item in chunk) + @staticmethod def _check_chunksize(chunksize): if chunksize < 1: