diff --git a/Doc/library/multiprocessing.rst b/Doc/library/multiprocessing.rst index d18ada3511d891..0ce5800806a166 100644 --- a/Doc/library/multiprocessing.rst +++ b/Doc/library/multiprocessing.rst @@ -2448,7 +2448,7 @@ with the :class:`Pool` class. Callbacks should complete immediately since otherwise the thread which handles the results will get blocked. - .. method:: imap(func, iterable[, chunksize]) + .. method:: imap(func, iterable[, chunksize[, buffersize]]) A lazier version of :meth:`.map`. @@ -2462,12 +2462,27 @@ with the :class:`Pool` class. ``next(timeout)`` will raise :exc:`multiprocessing.TimeoutError` if the result cannot be returned within *timeout* seconds. - .. method:: imap_unordered(func, iterable[, chunksize]) + The *iterable* is collected immediately rather than lazily, unless a + *buffersize* is specified to limit the number of submitted tasks whose + results have not yet been yielded. If the buffer is full, iteration over + the *iterables* pauses until a result is yielded from the buffer. + To fully utilize pool's capacity when using this feature, + set *buffersize* at least to the number of processes in pool + (to consume *iterable* as you go), or even higher + (to prefetch the next ``N=buffersize-processes`` arguments). + + .. versionadded:: next + Added the *buffersize* parameter. + + .. method:: imap_unordered(func, iterable[, chunksize[, buffersize]]) The same as :meth:`imap` except that the ordering of the results from the returned iterator should be considered arbitrary. (Only when there is only one worker process is the order guaranteed to be "correct".) + .. versionadded:: next + Added the *buffersize* parameter. + .. method:: starmap(func, iterable[, chunksize]) Like :meth:`~multiprocessing.pool.Pool.map` except that the diff --git a/Doc/whatsnew/3.15.rst b/Doc/whatsnew/3.15.rst index 9f01b52f1aff3b..177a0fe7eb88df 100644 --- a/Doc/whatsnew/3.15.rst +++ b/Doc/whatsnew/3.15.rst @@ -273,6 +273,22 @@ math (Contributed by Bénédikt Tran in :gh:`135853`.) +multiprocessing +--------------- + +* Add the optional ``buffersize`` parameter to + :meth:`multiprocessing.pool.Pool.imap` and + :meth:`multiprocessing.pool.Pool.imap_unordered` to limit the number of + submitted tasks whose results have not yet been yielded. If the buffer is + full, iteration over the *iterables* pauses until a result is yielded from + the buffer. To fully utilize pool's capacity when using this feature, set + *buffersize* at least to the number of processes in pool (to consume + *iterable* as you go), or even higher (to prefetch the next + ``N=buffersize-processes`` arguments). + + (Contributed by Oleksandr Baltian in :gh:`136871`.) + + os.path ------- diff --git a/Lib/multiprocessing/pool.py b/Lib/multiprocessing/pool.py index f979890170b1a1..5c5930a0d7aa4d 100644 --- a/Lib/multiprocessing/pool.py +++ b/Lib/multiprocessing/pool.py @@ -14,6 +14,7 @@ # import collections +import functools import itertools import os import queue @@ -190,6 +191,11 @@ def __init__(self, processes=None, initializer=None, initargs=(), self._ctx = context or get_context() self._setup_queues() self._taskqueue = queue.SimpleQueue() + # The _taskqueue_buffersize_semaphores exist to allow calling .release() + # on every active semaphore when the pool is terminating to let task_handler + # wake up to stop. It's a dict so that each iterator object can efficiently + # deregister its semaphore when iterator finishes. + self._taskqueue_buffersize_semaphores = {} # The _change_notifier queue exist to wake up self._handle_workers() # when the cache (self._cache) is empty or when there is a change in # the _state variable of the thread that runs _handle_workers. @@ -256,7 +262,8 @@ def __init__(self, processes=None, initializer=None, initargs=(), self, self._terminate_pool, args=(self._taskqueue, self._inqueue, self._outqueue, self._pool, self._change_notifier, self._worker_handler, self._task_handler, - self._result_handler, self._cache), + self._result_handler, self._cache, + self._taskqueue_buffersize_semaphores), exitpriority=15 ) self._state = RUN @@ -382,73 +389,42 @@ def starmap_async(self, func, iterable, chunksize=None, callback=None, return self._map_async(func, iterable, starmapstar, chunksize, callback, error_callback) - def _guarded_task_generation(self, result_job, func, iterable): + def _guarded_task_generation(self, result_job, func, iterable, sema=None): '''Provides a generator of tasks for imap and imap_unordered with appropriate handling for iterables which throw exceptions during iteration.''' try: i = -1 - for i, x in enumerate(iterable): - yield (result_job, i, func, (x,), {}) + + if sema is None: + for i, x in enumerate(iterable): + yield (result_job, i, func, (x,), {}) + + else: + enumerated_iter = iter(enumerate(iterable)) + while True: + sema.acquire() + try: + i, x = next(enumerated_iter) + except StopIteration: + break + yield (result_job, i, func, (x,), {}) + except Exception as e: yield (result_job, i+1, _helper_reraises_exception, (e,), {}) - def imap(self, func, iterable, chunksize=1): + def imap(self, func, iterable, chunksize=1, buffersize=None): ''' Equivalent of `map()` -- can be MUCH slower than `Pool.map()`. ''' - self._check_running() - if chunksize == 1: - result = IMapIterator(self) - self._taskqueue.put( - ( - self._guarded_task_generation(result._job, func, iterable), - result._set_length - )) - return result - else: - if chunksize < 1: - raise ValueError( - "Chunksize must be 1+, not {0:n}".format( - chunksize)) - task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapIterator(self) - self._taskqueue.put( - ( - self._guarded_task_generation(result._job, - mapstar, - task_batches), - result._set_length - )) - return (item for chunk in result for item in chunk) + return self._imap(IMapIterator, func, iterable, chunksize, buffersize) - def imap_unordered(self, func, iterable, chunksize=1): + def imap_unordered(self, func, iterable, chunksize=1, buffersize=None): ''' Like `imap()` method but ordering of results is arbitrary. ''' - self._check_running() - if chunksize == 1: - result = IMapUnorderedIterator(self) - self._taskqueue.put( - ( - self._guarded_task_generation(result._job, func, iterable), - result._set_length - )) - return result - else: - if chunksize < 1: - raise ValueError( - "Chunksize must be 1+, not {0!r}".format(chunksize)) - task_batches = Pool._get_tasks(func, iterable, chunksize) - result = IMapUnorderedIterator(self) - self._taskqueue.put( - ( - self._guarded_task_generation(result._job, - mapstar, - task_batches), - result._set_length - )) - return (item for chunk in result for item in chunk) + return self._imap(IMapUnorderedIterator, func, iterable, chunksize, + buffersize) def apply_async(self, func, args=(), kwds={}, callback=None, error_callback=None): @@ -497,6 +473,50 @@ def _map_async(self, func, iterable, mapper, chunksize=None, callback=None, ) return result + def _imap(self, iterator_cls, func, iterable, chunksize=1, + buffersize=None): + self._check_running() + self._check_chunksize(chunksize) + self._check_buffersize(buffersize) + + result = iterator_cls(self, buffersize) + if chunksize == 1: + self._taskqueue.put( + ( + self._guarded_task_generation(result._job, func, iterable, + result._buffersize_sema), + result._set_length, + ) + ) + return result + else: + task_batches = Pool._get_tasks(func, iterable, chunksize) + self._taskqueue.put( + ( + self._guarded_task_generation(result._job, mapstar, + task_batches, + result._buffersize_sema), + result._set_length, + ) + ) + return (item for chunk in result for item in chunk) + + @staticmethod + def _check_chunksize(chunksize): + if chunksize < 1: + raise ValueError( + "Chunksize must be 1+, not {0:n}".format(chunksize) + ) + + @staticmethod + def _check_buffersize(buffersize): + if buffersize is None: + return + if not isinstance(buffersize, int): + raise TypeError("buffersize must be an integer or None") + if buffersize < 1: + raise ValueError("buffersize must be None or > 0") + @staticmethod def _wait_for_updates(sentinels, change_notifier, timeout=None): wait(sentinels, timeout=timeout) @@ -679,7 +699,8 @@ def _help_stuff_finish(inqueue, task_handler, size): @classmethod def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, - worker_handler, task_handler, result_handler, cache): + worker_handler, task_handler, result_handler, cache, + taskqueue_buffersize_semaphores): # this is guaranteed to only be called once util.debug('finalizing pool') @@ -690,6 +711,12 @@ def _terminate_pool(cls, taskqueue, inqueue, outqueue, pool, change_notifier, change_notifier.put(None) task_handler._state = TERMINATE + # Release all semaphores to wake up task_handler to stop. + for job_id, buffersize_sema in tuple( + taskqueue_buffersize_semaphores.items() + ): + buffersize_sema.release() + taskqueue_buffersize_semaphores.pop(job_id, None) util.debug('helping task handler/workers to finish') cls._help_stuff_finish(inqueue, task_handler, len(pool)) @@ -836,7 +863,7 @@ def _set(self, i, success_result): class IMapIterator(object): - def __init__(self, pool): + def __init__(self, pool, buffersize=None): self._pool = pool self._cond = threading.Condition(threading.Lock()) self._job = next(job_counter) @@ -846,6 +873,13 @@ def __init__(self, pool): self._length = None self._unsorted = {} self._cache[self._job] = self + if buffersize is None: + self._buffersize_sema = None + else: + self._buffersize_sema = threading.Semaphore(buffersize) + self._pool._taskqueue_buffersize_semaphores[self._job] = ( + self._buffersize_sema + ) def __iter__(self): return self @@ -856,22 +890,30 @@ def next(self, timeout=None): item = self._items.popleft() except IndexError: if self._index == self._length: - self._pool = None - raise StopIteration from None + self._stop_iterator() self._cond.wait(timeout) try: item = self._items.popleft() except IndexError: if self._index == self._length: - self._pool = None - raise StopIteration from None + self._stop_iterator() raise TimeoutError from None + if self._buffersize_sema is not None: + self._buffersize_sema.release() + success, value = item if success: return value raise value + def _stop_iterator(self): + if self._pool is not None: + # `self._pool` could be set to `None` in previous `.next()` calls + self._pool._taskqueue_buffersize_semaphores.pop(self._job, None) + self._pool = None + raise StopIteration from None + __next__ = next # XXX def _set(self, i, obj): diff --git a/Lib/test/_test_multiprocessing.py b/Lib/test/_test_multiprocessing.py index a1259ff1d63d18..d85582fb528c4b 100644 --- a/Lib/test/_test_multiprocessing.py +++ b/Lib/test/_test_multiprocessing.py @@ -2916,18 +2916,42 @@ def test_async_timeout(self): p.join() def test_imap(self): - it = self.pool.imap(sqr, list(range(10))) - self.assertEqual(list(it), list(map(sqr, list(range(10))))) - - it = self.pool.imap(sqr, list(range(10))) - for i in range(10): - self.assertEqual(next(it), i*i) - self.assertRaises(StopIteration, it.__next__) + optimal_buffersize = 4 # `self.pool` size + buffersize_variants = [ + {"buffersize": None}, + {"buffersize": 1}, + {"buffersize": optimal_buffersize}, + {"buffersize": optimal_buffersize * 2}, + ] - it = self.pool.imap(sqr, list(range(1000)), chunksize=100) - for i in range(1000): - self.assertEqual(next(it), i*i) - self.assertRaises(StopIteration, it.__next__) + for kwargs in ({}, *buffersize_variants): + with self.subTest(**kwargs): + iterable = range(10) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap(sqr, iterable, **kwargs) + self.assertEqual(list(it), list(map(sqr, list(range(10))))) + + iterable = range(10) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap(sqr, iterable, **kwargs) + for i in range(10): + self.assertEqual(next(it), i * i) + self.assertRaises(StopIteration, it.__next__) + + for kwargs in ( + {"chunksize": 100}, + {"chunksize": 100, "buffersize": optimal_buffersize}, + ): + with self.subTest(**kwargs): + iterable = range(1000) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap(sqr, iterable, **kwargs) + for i in range(1000): + self.assertEqual(next(it), i * i) + self.assertRaises(StopIteration, it.__next__) def test_imap_handle_iterable_exception(self): if self.TYPE == 'manager': @@ -2956,11 +2980,32 @@ def test_imap_handle_iterable_exception(self): self.assertRaises(SayWhenError, it.__next__) def test_imap_unordered(self): - it = self.pool.imap_unordered(sqr, list(range(10))) - self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) + optimal_buffersize = 4 # `self.pool` size + buffersize_variants = [ + {"buffersize": None}, + {"buffersize": 1}, + {"buffersize": optimal_buffersize}, + {"buffersize": optimal_buffersize * 2}, + ] - it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) - self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) + for kwargs in ({}, *buffersize_variants): + with self.subTest(**kwargs): + iterable = range(10) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap_unordered(sqr, iterable, **kwargs) + self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) + + for kwargs in ( + {"chunksize": 100}, + {"chunksize": 100, "buffersize": optimal_buffersize}, + ): + with self.subTest(**kwargs): + iterable = range(1000) + if self.TYPE != "threads": + iterable = list(iterable) + it = self.pool.imap_unordered(sqr, iterable, **kwargs) + self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) def test_imap_unordered_handle_iterable_exception(self): if self.TYPE == 'manager': @@ -2998,6 +3043,137 @@ def test_imap_unordered_handle_iterable_exception(self): self.assertIn(value, expected_values) expected_values.remove(value) + def test_imap_and_imap_unordered_buffersize_type_validation(self): + for method_name in ("imap", "imap_unordered"): + for buffersize in ("foo", 2.0): + with ( + self.subTest(method=method_name, buffersize=buffersize), + self.assertRaisesRegex( + TypeError, "buffersize must be an integer or None" + ), + ): + method = getattr(self.pool, method_name) + method(str, range(4), buffersize=buffersize) + + def test_imap_and_imap_unordered_buffersize_value_validation(self): + for method_name in ("imap", "imap_unordered"): + for buffersize in (0, -1): + with ( + self.subTest(method=method_name, buffersize=buffersize), + self.assertRaisesRegex( + ValueError, "buffersize must be None or > 0" + ), + ): + method = getattr(self.pool, method_name) + method(str, range(4), buffersize=buffersize) + + def test_imap_and_imap_unordered_when_buffer_is_full(self): + if self.TYPE != "threads": + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + processes = 4 + p = self.Pool(processes) + last_produced_task_arg = Value("i") + + def produce_args(): + for arg in itertools.count(1): + last_produced_task_arg.value = arg + yield arg + + method = getattr(p, method_name) + it = method(functools.partial(sqr, wait=0.2), produce_args()) + + time.sleep(0.2) + # `iterable` could've been advanced only `processes` times, + # but in fact it advances further (`> processes`) because of + # not waiting for workers or user code to catch up. + self.assertGreater(last_produced_task_arg.value, processes) + + next(it) + time.sleep(0.2) + self.assertGreater(last_produced_task_arg.value, processes + 1) + + next(it) + time.sleep(0.2) + self.assertGreater(last_produced_task_arg.value, processes + 2) + + p.terminate() + p.join() + + def test_imap_and_imap_unordered_buffersize_when_buffer_is_full(self): + if self.TYPE != "threads": + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + processes = 4 + p = self.Pool(processes) + last_produced_task_arg = Value("i") + + def produce_args(): + for arg in itertools.count(1): + last_produced_task_arg.value = arg + yield arg + + method = getattr(p, method_name) + it = method( + functools.partial(sqr, wait=0.2), + produce_args(), + buffersize=processes, + ) + + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes) + + next(it) + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes + 1) + + next(it) + time.sleep(0.2) + self.assertEqual(last_produced_task_arg.value, processes + 2) + + p.terminate() + p.join() + + def test_imap_and_imap_unordered_buffersize_on_empty_iterable(self): + for method_name in ("imap", "imap_unordered"): + with self.subTest(method=method_name): + method = getattr(self.pool, method_name) + + res = method(str, [], buffersize=2) + + self.assertIsNone(next(res, None)) + + def test_imap_buffersize_on_infinite_iterable(self): + if self.TYPE != "threads": + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + p = self.Pool(4) + res = p.imap(str, itertools.count(), buffersize=2) + + self.assertEqual(next(res, None), "0") + self.assertEqual(next(res, None), "1") + self.assertEqual(next(res, None), "2") + + p.terminate() + p.join() + + def test_imap_unordered_buffersize_on_infinite_iterable(self): + if self.TYPE != "threads": + self.skipTest("test not appropriate for {}".format(self.TYPE)) + + p = self.Pool(4) + res = p.imap(str, itertools.count(), buffersize=2) + first_three_results = sorted(next(res, None) for _ in range(3)) + + self.assertEqual(first_three_results, ["0", "1", "2"]) + + p.terminate() + p.join() + def test_make_pool(self): expected_error = (RemoteError if self.TYPE == 'manager' else ValueError) diff --git a/Misc/NEWS.d/next/Library/2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst b/Misc/NEWS.d/next/Library/2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst new file mode 100644 index 00000000000000..ca40bc111176e0 --- /dev/null +++ b/Misc/NEWS.d/next/Library/2025-07-28-11-00-49.gh-issue-64192.7htLtg.rst @@ -0,0 +1,9 @@ +Add the optional ``buffersize`` parameter to +:meth:`multiprocessing.pool.Pool.imap` and +:meth:`multiprocessing.pool.Pool.imap_unordered` to limit the number of +submitted tasks whose results have not yet been yielded. If the buffer is +full, iteration over the *iterables* pauses until a result is yielded from +the buffer. To fully utilize pool's capacity when using this feature, set +*buffersize* at least to the number of processes in pool (to consume +*iterable* as you go), or even higher (to prefetch the next +``N=buffersize-processes`` arguments).