Skip to content
43 changes: 43 additions & 0 deletions returns/methods/gather.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
from typing import Any

from typing import Awaitable, Iterable

import anyio
from returns.future import Future, FutureResult
from returns.io import IOResult



async def gather(
containers: Iterable[
Awaitable,
],
) -> tuple[IOResult, ...]:
"""
Execute multiple coroutines concurrently and return their wrapped results.

.. code:: python

>>> import anyio
>>> from returns.methods.gather import gather
>>> from returns.io import IOSuccess

>>> async def coro():
... return 1
>>> assert anyio.run(gather([coro()])) == (IOSuccess(1), )
>>> container = FutureResult(coro())
>>> assert anyio.run(gather([container.awaitable])) == (IOSuccess(1), )

"""

async with anyio.create_task_group() as tg:
containers_t = tuple(containers)
results: list[IOResult] = len(containers_t)*[IOResult(None)]
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you can use a dict[int, IOResult[R, E]] here instead. Where int is the index of the running task.


async def run_task(coro: Awaitable, index: int):
results[index] = await FutureResult(coro)

for i, coro in enumerate(containers_t):
tg.start_soon(run_task, coro, i)
return tuple(results)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and return a tuple sorted by keys of the this dict.


Loading