From e09e762e0560473af1c35e5b50d1d6e3c3f8c6d0 Mon Sep 17 00:00:00 2001 From: Roman Matveev Date: Sat, 23 Nov 2024 19:44:11 +0400 Subject: [PATCH 1/9] wip --- returns/methods/gather.py | 45 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) create mode 100644 returns/methods/gather.py diff --git a/returns/methods/gather.py b/returns/methods/gather.py new file mode 100644 index 000000000..5fa12b059 --- /dev/null +++ b/returns/methods/gather.py @@ -0,0 +1,45 @@ +from typing import Any + +from typing import Awaitable, Iterable + +import anyio +from returns.future import Future, FutureResult +from returns.io import IOResult + + + +async def gather( + containers: Iterable[ + Awaitable, + ], +) -> tuple[IOResult, ...]: + """ + Execute multiple coroutines concurrently and return their wrapped results. + + .. code:: python + + >>> import anyio + >>> from returns.methods.gather import gather + >>> from returns.io import IOSuccess + + >>> async def coro(): + ... return 1 + >>> assert anyio.run(gather([coro()])) == (IOSuccess(1),) + >>> container = FutureResult(coro()) + >>> assert anyio.run(gather([container.awaitable])) == (IOSuccess(1),) + + """ + + async with anyio.create_task_group() as tg: + containers_t = tuple(containers) + results: list[IOResult | None] = len(containers_t)*[IOResult(None)] + + async def run_task(coro: Awaitable, index: int): + results[index] = await FutureResult(coro) + + for i, coro in enumerate(containers_t): + if i >= len(results): + results = results + 2*len(results)*[None] + tg.start_soon(run_task, coro, i) + return tuple(results) + From dffe4f7b80d622ef6a820afb8e329aee0eb6474b Mon Sep 17 00:00:00 2001 From: Roman Matveev Date: Sat, 23 Nov 2024 20:44:58 +0400 Subject: [PATCH 2/9] concept --- returns/methods/gather.py | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/returns/methods/gather.py b/returns/methods/gather.py index 5fa12b059..f12f73167 100644 --- a/returns/methods/gather.py +++ b/returns/methods/gather.py @@ -24,22 +24,20 @@ async def gather( >>> async def coro(): ... return 1 - >>> assert anyio.run(gather([coro()])) == (IOSuccess(1),) + >>> assert anyio.run(gather([coro()])) == (IOSuccess(1), ) >>> container = FutureResult(coro()) - >>> assert anyio.run(gather([container.awaitable])) == (IOSuccess(1),) + >>> assert anyio.run(gather([container.awaitable])) == (IOSuccess(1), ) """ async with anyio.create_task_group() as tg: containers_t = tuple(containers) - results: list[IOResult | None] = len(containers_t)*[IOResult(None)] + results: list[IOResult] = len(containers_t)*[IOResult(None)] async def run_task(coro: Awaitable, index: int): results[index] = await FutureResult(coro) for i, coro in enumerate(containers_t): - if i >= len(results): - results = results + 2*len(results)*[None] tg.start_soon(run_task, coro, i) return tuple(results) From e858b8b863cb1230c54bb5a378c26a1bb531ed32 Mon Sep 17 00:00:00 2001 From: Roman Matveev Date: Sun, 24 Nov 2024 14:49:48 +0400 Subject: [PATCH 3/9] wip --- CHANGELOG.md | 3 ++- docs/pages/methods.rst | 37 ++++++++++++++++++++++++++----- returns/methods/__init__.py | 1 + returns/methods/gather.py | 27 ++++++++++++---------- tests/test_methods/test_gather.py | 22 ++++++++++++++++++ 5 files changed, 71 insertions(+), 19 deletions(-) create mode 100644 tests/test_methods/test_gather.py diff --git a/CHANGELOG.md b/CHANGELOG.md index 4e390a0af..6e2db3c46 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,9 +14,10 @@ See [0Ver](https://0ver.org/). - Improve inference of `ResultLike` objects when exception catching decorator is applied with explicit exception types - Add picky exceptions to `impure_safe` decorator like `safe` has. Issue #1543 -- Add partition function to result module. Issue #1905 +- Add `partition` function to methods module. Issue #1905 - Adds `default_error` parameter to `returns.converters.maybe_to_result`, which provides a default error value for `Failure` +- Add `returns.methods.gather` utility method ### Misc diff --git a/docs/pages/methods.rst b/docs/pages/methods.rst index ce7cfa64d..12c705998 100644 --- a/docs/pages/methods.rst +++ b/docs/pages/methods.rst @@ -79,7 +79,7 @@ Here's a full example: partition ~~~~~~~~~ -:func:`partition ` is used to convert +:func:`partition ` is used to convert list of :class:`~returns.interfaces.Unwrappable` instances like :class:`~returns.result.Result`, :class:`~returns.io.IOResult`, and :class:`~returns.maybe.Maybe` @@ -87,11 +87,36 @@ to a tuple of two lists: successes and failures. .. code:: python - >>> from returns.result import Failure, Success - >>> from returns.methods import partition - >>> results = [Success(1), Failure(2), Success(3), Failure(4)] - >>> partition(results) - ([1, 3], [2, 4]) + >>> from returns.result import Failure, Success + >>> from returns.methods import partition + >>> results = [Success(1), Failure(2), Success(3), Failure(4)] + >>> partition(results) + ([1, 3], [2, 4]) + +gather +~~~~~~ + +:func:`gather ` is used to safely concurrently +execute multiple awaitable objects(any object with ``__await__`` method, +included function marked with async keyword) and return a tuple of wrapped results +:class: `~returns.io.IOResult`. +Embrace railway-oriented programming princple of executing as many IO operations +as possible before synchrounous computations. + +.. code:: python + + >>> import anyio + >>> from returns.io import IO, IOSuccess, IOFailure + >>> from returns.methods import gather + + >>> async def coro(): + ... return 1 + >>> async def coro_raise(): + ... raise ValueError(2) + >>> anyio.run(gather,[coro(), coro_raise()]) + (IOSuccess(1), IOFailure(ValueError(2))) + + API Reference ------------- diff --git a/returns/methods/__init__.py b/returns/methods/__init__.py index 0613f1f5c..e3b4d3508 100644 --- a/returns/methods/__init__.py +++ b/returns/methods/__init__.py @@ -1,4 +1,5 @@ from returns.methods.cond import cond as cond +from returns.methods.gather import gather as gather from returns.methods.partition import partition as partition from returns.methods.unwrap_or_failure import ( unwrap_or_failure as unwrap_or_failure, diff --git a/returns/methods/gather.py b/returns/methods/gather.py index f12f73167..420fe37e1 100644 --- a/returns/methods/gather.py +++ b/returns/methods/gather.py @@ -1,11 +1,10 @@ -from typing import Any from typing import Awaitable, Iterable import anyio -from returns.future import Future, FutureResult -from returns.io import IOResult +from returns.future import FutureResult +from returns.io import IOResult async def gather( @@ -24,20 +23,24 @@ async def gather( >>> async def coro(): ... return 1 - >>> assert anyio.run(gather([coro()])) == (IOSuccess(1), ) + >>> assert anyio.run(gather, [coro()]) == (IOSuccess(1), ) >>> container = FutureResult(coro()) - >>> assert anyio.run(gather([container.awaitable])) == (IOSuccess(1), ) + >>> assert anyio.run(gather, [container.awaitable]) == (IOSuccess(1), ) """ - async with anyio.create_task_group() as tg: containers_t = tuple(containers) - results: list[IOResult] = len(containers_t)*[IOResult(None)] + ioresults: dict[int, IOResult] = {} - async def run_task(coro: Awaitable, index: int): - results[index] = await FutureResult(coro) + async def _coro_wrapper(coro: Awaitable): # noqa: WPS430 + try: + return IOResult.from_value(await coro) + except Exception as exc: + return IOResult.from_failure(exc) - for i, coro in enumerate(containers_t): - tg.start_soon(run_task, coro, i) - return tuple(results) + async def _run_task(coro: Awaitable, index: int): # noqa: WPS430 + ioresults[index] = await _coro_wrapper(coro) + for coro_index, coro in enumerate(containers_t): + tg.start_soon(_run_task, coro, coro_index) + return tuple([ioresults[key] for key in sorted(ioresults.keys())]) diff --git a/tests/test_methods/test_gather.py b/tests/test_methods/test_gather.py new file mode 100644 index 000000000..d79c90ff9 --- /dev/null +++ b/tests/test_methods/test_gather.py @@ -0,0 +1,22 @@ + +import pytest + +from returns.future import Future, FutureResult +from returns.io import IO, IOResult +from returns.methods import gather + + +@pytest.mark.parametrize(('containers', 'expected'), [ + ( + ( + Future.from_value(1), + FutureResult.from_value(2), + FutureResult.from_failure(None), + ), + (IO(1), IOResult.from_value(2), FutureResult.from_failure(None)), + ), + ((), ()), +]) +def test_gather(containers, expected): + """Test partition function.""" + assert gather(containers) == expected From d1920b09ef1dc173c92a28fe4831cd8d6fff0053 Mon Sep 17 00:00:00 2001 From: RomanMIzulin Date: Sun, 24 Nov 2024 17:17:25 +0400 Subject: [PATCH 4/9] Update returns/methods/gather.py Co-authored-by: sobolevn --- returns/methods/gather.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/returns/methods/gather.py b/returns/methods/gather.py index 420fe37e1..1fc598f90 100644 --- a/returns/methods/gather.py +++ b/returns/methods/gather.py @@ -43,4 +43,4 @@ async def _run_task(coro: Awaitable, index: int): # noqa: WPS430 for coro_index, coro in enumerate(containers_t): tg.start_soon(_run_task, coro, coro_index) - return tuple([ioresults[key] for key in sorted(ioresults.keys())]) + return tuple(ioresults[key] for key in sorted(ioresults.keys())) From 4b0f7072c5a3e2bf322df796ee14339c94d5cd65 Mon Sep 17 00:00:00 2001 From: Roman Matveev Date: Sun, 24 Nov 2024 20:10:11 +0400 Subject: [PATCH 5/9] wip: linting and little updates --- docs/pages/methods.rst | 3 ++- returns/methods/__init__.py | 3 ++- returns/methods/{gather.py => async_.py} | 20 +++++++------------- tests/test_methods/test_gather.py | 6 +++--- 4 files changed, 14 insertions(+), 18 deletions(-) rename returns/methods/{gather.py => async_.py} (60%) diff --git a/docs/pages/methods.rst b/docs/pages/methods.rst index 12c705998..e06ac333c 100644 --- a/docs/pages/methods.rst +++ b/docs/pages/methods.rst @@ -107,6 +107,7 @@ as possible before synchrounous computations. >>> import anyio >>> from returns.io import IO, IOSuccess, IOFailure + >>> from returns.result import Failure, Success >>> from returns.methods import gather >>> async def coro(): @@ -114,7 +115,7 @@ as possible before synchrounous computations. >>> async def coro_raise(): ... raise ValueError(2) >>> anyio.run(gather,[coro(), coro_raise()]) - (IOSuccess(1), IOFailure(ValueError(2))) + (>, >) diff --git a/returns/methods/__init__.py b/returns/methods/__init__.py index e3b4d3508..f5b390aca 100644 --- a/returns/methods/__init__.py +++ b/returns/methods/__init__.py @@ -1,5 +1,6 @@ +"""Set of various utility functions.""" +from returns.methods.async_ import gather as gather from returns.methods.cond import cond as cond -from returns.methods.gather import gather as gather from returns.methods.partition import partition as partition from returns.methods.unwrap_or_failure import ( unwrap_or_failure as unwrap_or_failure, diff --git a/returns/methods/gather.py b/returns/methods/async_.py similarity index 60% rename from returns/methods/gather.py rename to returns/methods/async_.py index 1fc598f90..ea78fc177 100644 --- a/returns/methods/gather.py +++ b/returns/methods/async_.py @@ -3,7 +3,6 @@ import anyio -from returns.future import FutureResult from returns.io import IOResult @@ -18,29 +17,24 @@ async def gather( .. code:: python >>> import anyio - >>> from returns.methods.gather import gather + >>> from returns.methods import gather >>> from returns.io import IOSuccess >>> async def coro(): ... return 1 >>> assert anyio.run(gather, [coro()]) == (IOSuccess(1), ) - >>> container = FutureResult(coro()) - >>> assert anyio.run(gather, [container.awaitable]) == (IOSuccess(1), ) - """ async with anyio.create_task_group() as tg: - containers_t = tuple(containers) ioresults: dict[int, IOResult] = {} - async def _coro_wrapper(coro: Awaitable): # noqa: WPS430 + async def _run_task(coro: Awaitable, index: int): # noqa: WPS430 + ioresult: IOResult try: - return IOResult.from_value(await coro) + ioresult = IOResult.from_value(await coro) except Exception as exc: - return IOResult.from_failure(exc) - - async def _run_task(coro: Awaitable, index: int): # noqa: WPS430 - ioresults[index] = await _coro_wrapper(coro) + ioresult = IOResult.from_failure(exc) + ioresults[index] = ioresult - for coro_index, coro in enumerate(containers_t): + for coro_index, coro in enumerate(containers): tg.start_soon(_run_task, coro, coro_index) return tuple(ioresults[key] for key in sorted(ioresults.keys())) diff --git a/tests/test_methods/test_gather.py b/tests/test_methods/test_gather.py index d79c90ff9..2f56a1b42 100644 --- a/tests/test_methods/test_gather.py +++ b/tests/test_methods/test_gather.py @@ -13,10 +13,10 @@ FutureResult.from_value(2), FutureResult.from_failure(None), ), - (IO(1), IOResult.from_value(2), FutureResult.from_failure(None)), + (IO(1), IOResult.from_value(2), IOResult.from_failure(None)), ), ((), ()), ]) -def test_gather(containers, expected): +async def test_gather(containers, expected): """Test partition function.""" - assert gather(containers) == expected + assert await gather(containers) == expected From e959624889a85a2237d9714a35d0a4166cca1b9b Mon Sep 17 00:00:00 2001 From: Roman Matveev Date: Sun, 24 Nov 2024 20:13:27 +0400 Subject: [PATCH 6/9] ignore file name violation --- returns/methods/async_.py | 1 + 1 file changed, 1 insertion(+) diff --git a/returns/methods/async_.py b/returns/methods/async_.py index ea78fc177..b2f8965b6 100644 --- a/returns/methods/async_.py +++ b/returns/methods/async_.py @@ -1,3 +1,4 @@ +# flake8: noqa: WPS102 from typing import Awaitable, Iterable From 5bab4bcd8d450bf86870e6f50cc843bf91b8ebd5 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 2 May 2025 07:46:30 +0000 Subject: [PATCH 7/9] [pre-commit.ci] auto fixes from pre-commit.com hooks --- returns/methods/__init__.py | 1 + returns/methods/async_.py | 6 ++---- tests/test_methods/test_gather.py | 22 ++++++++++++---------- 3 files changed, 15 insertions(+), 14 deletions(-) diff --git a/returns/methods/__init__.py b/returns/methods/__init__.py index f5b390aca..1d9e972d8 100644 --- a/returns/methods/__init__.py +++ b/returns/methods/__init__.py @@ -1,4 +1,5 @@ """Set of various utility functions.""" + from returns.methods.async_ import gather as gather from returns.methods.cond import cond as cond from returns.methods.partition import partition as partition diff --git a/returns/methods/async_.py b/returns/methods/async_.py index b2f8965b6..87c3fd898 100644 --- a/returns/methods/async_.py +++ b/returns/methods/async_.py @@ -1,6 +1,6 @@ # flake8: noqa: WPS102 -from typing import Awaitable, Iterable +from collections.abc import Awaitable, Iterable import anyio @@ -8,9 +8,7 @@ async def gather( - containers: Iterable[ - Awaitable, - ], + containers: Iterable[Awaitable,], ) -> tuple[IOResult, ...]: """ Execute multiple coroutines concurrently and return their wrapped results. diff --git a/tests/test_methods/test_gather.py b/tests/test_methods/test_gather.py index 2f56a1b42..084141705 100644 --- a/tests/test_methods/test_gather.py +++ b/tests/test_methods/test_gather.py @@ -1,4 +1,3 @@ - import pytest from returns.future import Future, FutureResult @@ -6,17 +5,20 @@ from returns.methods import gather -@pytest.mark.parametrize(('containers', 'expected'), [ - ( +@pytest.mark.parametrize( + ('containers', 'expected'), + [ ( - Future.from_value(1), - FutureResult.from_value(2), - FutureResult.from_failure(None), + ( + Future.from_value(1), + FutureResult.from_value(2), + FutureResult.from_failure(None), + ), + (IO(1), IOResult.from_value(2), IOResult.from_failure(None)), ), - (IO(1), IOResult.from_value(2), IOResult.from_failure(None)), - ), - ((), ()), -]) + ((), ()), + ], +) async def test_gather(containers, expected): """Test partition function.""" assert await gather(containers) == expected From 231fa1e3a62b263f428bd5c37d27e3e7291c5fb4 Mon Sep 17 00:00:00 2001 From: Roman Matveev Date: Fri, 2 May 2025 12:35:07 +0400 Subject: [PATCH 8/9] wip --- CHANGELOG.md | 8 +++++++- tests/test_methods/test_gather.py | 25 ++++++++++++++++++++----- 2 files changed, 27 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 091da57a6..0bf999229 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,13 @@ incremental in minor, bugfixes only are patches. See [0Ver](https://0ver.org/). +## 0.26.0 + +### Features + +- Add `returns.methods.gather` utility method + + ## 0.25.0 ### Features @@ -33,7 +40,6 @@ See [0Ver](https://0ver.org/). - Add partition function to result module. Issue #1905 - Add `default_error` parameter to `returns.converters.maybe_to_result`, which provides a default error value for `Failure` -- Add `returns.methods.gather` utility method ## 0.24.1 diff --git a/tests/test_methods/test_gather.py b/tests/test_methods/test_gather.py index 2f56a1b42..92f1e4506 100644 --- a/tests/test_methods/test_gather.py +++ b/tests/test_methods/test_gather.py @@ -1,22 +1,37 @@ import pytest +import anyio from returns.future import Future, FutureResult +from returns.result import Result from returns.io import IO, IOResult from returns.methods import gather +async def _helper_func1() -> str: + return 'successful function' + + +async def _helper_func2() -> str: + return 'failed function' + @pytest.mark.parametrize(('containers', 'expected'), [ ( ( - Future.from_value(1), - FutureResult.from_value(2), + FutureResult.from_value(1), FutureResult.from_failure(None), ), - (IO(1), IOResult.from_value(2), IOResult.from_failure(None)), + (IOResult.from_value(1), IOResult.from_failure(None)), ), ((), ()), + ( + ( + _helper_func1(), + _helper_func2(), + ), + (IOResult.from_result(Result.from_value("successful function")), IOResult.from_result(Result.from_failure("failed function"))) + ) ]) -async def test_gather(containers, expected): +def test_gather(containers, expected): """Test partition function.""" - assert await gather(containers) == expected + assert anyio.run(gather,containers) == expected From 965ee43466507306fe16359fe1ce70d81fb94f4b Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Fri, 2 May 2025 08:37:36 +0000 Subject: [PATCH 9/9] [pre-commit.ci] auto fixes from pre-commit.com hooks --- tests/test_methods/test_gather.py | 43 ++++++++++++++++++------------- 1 file changed, 25 insertions(+), 18 deletions(-) diff --git a/tests/test_methods/test_gather.py b/tests/test_methods/test_gather.py index 3bebc01d2..574aa4167 100644 --- a/tests/test_methods/test_gather.py +++ b/tests/test_methods/test_gather.py @@ -1,10 +1,10 @@ +import anyio import pytest -import anyio -from returns.future import Future, FutureResult -from returns.result import Result -from returns.io import IO, IOResult +from returns.future import FutureResult +from returns.io import IOResult from returns.methods import gather +from returns.result import Result async def _helper_func1() -> str: @@ -14,23 +14,30 @@ async def _helper_func1() -> str: async def _helper_func2() -> str: return 'failed function' -@pytest.mark.parametrize(('containers', 'expected'), [ - ( + +@pytest.mark.parametrize( + ('containers', 'expected'), + [ ( - FutureResult.from_value(1), - FutureResult.from_failure(None), + ( + FutureResult.from_value(1), + FutureResult.from_failure(None), + ), + (IOResult.from_value(1), IOResult.from_failure(None)), ), - (IOResult.from_value(1), IOResult.from_failure(None)), - ), - ((), ()), - ( + ((), ()), ( - _helper_func1(), - _helper_func2(), + ( + _helper_func1(), + _helper_func2(), + ), + ( + IOResult.from_result(Result.from_value('successful function')), + IOResult.from_result(Result.from_failure('failed function')), + ), ), - (IOResult.from_result(Result.from_value("successful function")), IOResult.from_result(Result.from_failure("failed function"))) - ) -]) + ], +) def test_gather(containers, expected): """Test partition function.""" - assert anyio.run(gather,containers) == expected + assert anyio.run(gather, containers) == expected