Skip to content

Just a concept of gather-like method #1966

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft
wants to merge 11 commits into
base: master
Choose a base branch
from
7 changes: 7 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,13 @@ incremental in minor, bugfixes only are patches.
See [0Ver](https://0ver.org/).


## 0.26.0

### Features

- Add `returns.methods.gather` utility method


## 0.25.0

### Features
Expand Down
38 changes: 32 additions & 6 deletions docs/pages/methods.rst
Original file line number Diff line number Diff line change
Expand Up @@ -79,19 +79,45 @@ Here's a full example:
partition
~~~~~~~~~

:func:`partition <returns.result.partition>` is used to convert
:func:`partition <returns.methods.partition>` is used to convert
list of :class:`~returns.interfaces.Unwrappable`
instances like :class:`~returns.result.Result`,
:class:`~returns.io.IOResult`, and :class:`~returns.maybe.Maybe`
to a tuple of two lists: successes and failures.

.. code:: python

>>> from returns.result import Failure, Success
>>> from returns.methods import partition
>>> results = [Success(1), Failure(2), Success(3), Failure(4)]
>>> partition(results)
([1, 3], [2, 4])
>>> from returns.result import Failure, Success
>>> from returns.methods import partition
>>> results = [Success(1), Failure(2), Success(3), Failure(4)]
>>> partition(results)
([1, 3], [2, 4])

gather
~~~~~~

:func:`gather <returns.methods.gather>` is used to safely concurrently
execute multiple awaitable objects(any object with ``__await__`` method,
included function marked with async keyword) and return a tuple of wrapped results
:class: `~returns.io.IOResult`.
Embrace railway-oriented programming princple of executing as many IO operations
as possible before synchrounous computations.

.. code:: python

>>> import anyio
>>> from returns.io import IO, IOSuccess, IOFailure
>>> from returns.result import Failure, Success
>>> from returns.methods import gather

>>> async def coro():
... return 1
>>> async def coro_raise():
... raise ValueError(2)
>>> anyio.run(gather,[coro(), coro_raise()])
(<IOResult: <Success: 1>>, <IOResult: <Failure: 2>>)



API Reference
-------------
Expand Down
3 changes: 3 additions & 0 deletions returns/methods/__init__.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
"""Set of various utility functions."""

from returns.methods.async_ import gather as gather
from returns.methods.cond import cond as cond
from returns.methods.partition import partition as partition
from returns.methods.unwrap_or_failure import (
Expand Down
39 changes: 39 additions & 0 deletions returns/methods/async_.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# flake8: noqa: WPS102

from collections.abc import Awaitable, Iterable

import anyio

from returns.io import IOResult


async def gather(
containers: Iterable[Awaitable,],
) -> tuple[IOResult, ...]:
"""
Execute multiple coroutines concurrently and return their wrapped results.

.. code:: python

>>> import anyio
>>> from returns.methods import gather
>>> from returns.io import IOSuccess

>>> async def coro():
... return 1
>>> assert anyio.run(gather, [coro()]) == (IOSuccess(1), )
"""
async with anyio.create_task_group() as tg:
ioresults: dict[int, IOResult] = {}

async def _run_task(coro: Awaitable, index: int): # noqa: WPS430
ioresult: IOResult
try:
ioresult = IOResult.from_value(await coro)
except Exception as exc:
ioresult = IOResult.from_failure(exc)
ioresults[index] = ioresult

for coro_index, coro in enumerate(containers):
tg.start_soon(_run_task, coro, coro_index)
return tuple(ioresults[key] for key in sorted(ioresults.keys()))
43 changes: 43 additions & 0 deletions tests/test_methods/test_gather.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
import anyio
import pytest

from returns.future import FutureResult
from returns.io import IOResult
from returns.methods import gather
from returns.result import Result


async def _helper_func1() -> str:
return 'successful function'


async def _helper_func2() -> str:
return 'failed function'


@pytest.mark.parametrize(
('containers', 'expected'),
[
(
(
FutureResult.from_value(1),
FutureResult.from_failure(None),
),
(IOResult.from_value(1), IOResult.from_failure(None)),
),
((), ()),
(
(
_helper_func1(),
_helper_func2(),
),
(
IOResult.from_result(Result.from_value('successful function')),
IOResult.from_result(Result.from_failure('failed function')),
),
),
],
)
def test_gather(containers, expected):
"""Test partition function."""
assert anyio.run(gather, containers) == expected
Loading