Examples

Fetching urls concurrently, and processing the responses as completed, is used as an example. This simple task is nonetheless suprisingly tedious, especially using asyncio.

[2]:
# only needed for notebook
import nest_asyncio
nest_asyncio.apply()

urls = [f'http://httpbin.org/delay/{d}' for d in (0.2, 0.1, 0.0)]

Threaded

[3]:
from concurrent import futures
import requests

def fetch_all(urls):
    with requests.Session() as session, futures.ThreadPoolExecutor() as executor:
        fs = [executor.submit(session.get, url) for url in urls]
        for future in futures.as_completed(fs):
            yield future.result()

for resp in fetch_all(urls):
    print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2

futured.threaded abstracts away the boilerplate.

[4]:
from futured import threaded

fetch = threaded(requests.Session().get)
for resp in fetch.map(urls, as_completed=True):
    print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2

Asynced

[5]:
import asyncio
import httpx

async def fetch_all(urls):
    client = httpx.Client()
    for future in asyncio.as_completed(map(client.get, urls)):
        yield await future

for resp in fetch_all(urls):
    print(resp.url)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-5-6d89a3ce2c7a> in <module>
      7         yield await future
      8
----> 9 for resp in fetch_all(urls):
     10     print(resp.url)

TypeError: 'async_generator' object is not iterable

The problem is coroutines support the yield keyword, but only to create async iterators. Even though asyncio.as_completed is itself a normal iterator, there is no way to write this generator as intended. Additionally there is no iterator equivalent of loop.run_until_complete, to mitigate the viral nature of the async keyword.

So futured.asynced provides one.

[6]:
from futured import asynced

for resp in asynced.run(fetch_all, urls):
    print(resp.url)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-6-1b7c3bc88fc0> in <module>
      1 from futured import asynced
      2
----> 3 for resp in asynced.run(fetch_all, urls):
      4     print(resp.url)

~/checkouts/readthedocs.org/user_builds/futured/checkouts/latest/futured.py in __next__(self)
    197     def __next__(self):
    198         try:
--> 199             result = self.loop.run_until_complete(self.future)
    200         except StopAsyncIteration:
    201             raise StopIteration

~/checkouts/readthedocs.org/user_builds/futured/envs/latest/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
     57         if not f.done():
     58             raise RuntimeError('Event loop stopped before Future completed.')
---> 59         return f.result()
     60
     61     def _run_once(self):

~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/futures.py in result(self)
    176         self.__log_traceback = False
    177         if self._exception is not None:
--> 178             raise self._exception
    179         return self._result
    180

~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
    221                 # We use the `send` method directly, because coroutines
    222                 # don't have `__iter__` and `__next__` methods.
--> 223                 result = coro.send(None)
    224             else:
    225                 result = coro.throw(exc)

<ipython-input-5-6d89a3ce2c7a> in fetch_all(urls)
      4 async def fetch_all(urls):
      5     client = httpx.Client()
----> 6     for future in asyncio.as_completed(map(client.get, urls)):
      7         yield await future
      8

~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in as_completed(fs, loop, timeout)
    507         raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
    508     loop = loop if loop is not None else events.get_event_loop()
--> 509     todo = {ensure_future(f, loop=loop) for f in set(fs)}
    510     from .queues import Queue  # Import here to avoid circular import problem.
    511     done = Queue(loop=loop)

~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in <setcomp>(.0)
    507         raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
    508     loop = loop if loop is not None else events.get_event_loop()
--> 509     todo = {ensure_future(f, loop=loop) for f in set(fs)}
    510     from .queues import Queue  # Import here to avoid circular import problem.
    511     done = Queue(loop=loop)

~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in ensure_future(coro_or_future, loop)
    590         return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    591     else:
--> 592         raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
    593                         'required')
    594

TypeError: An asyncio.Future, a coroutine or an awaitable is required

The alternative approach is to explicitly handle the loop in the implementation.

[7]:
def fetch_all(urls):
    loop = asyncio.get_event_loop()
    client = httpx.Client()
    for future in asyncio.as_completed(map(client.get, urls)):
        yield loop.run_until_complete(future)

for resp in fetch_all(urls):
    print(resp.url)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-7-10078f038fe1> in <module>
      5         yield loop.run_until_complete(future)
      6
----> 7 for resp in fetch_all(urls):
      8     print(resp.url)

<ipython-input-7-10078f038fe1> in fetch_all(urls)
      2     loop = asyncio.get_event_loop()
      3     client = httpx.Client()
----> 4     for future in asyncio.as_completed(map(client.get, urls)):
      5         yield loop.run_until_complete(future)
      6

~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in as_completed(fs, loop, timeout)
    507         raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
    508     loop = loop if loop is not None else events.get_event_loop()
--> 509     todo = {ensure_future(f, loop=loop) for f in set(fs)}
    510     from .queues import Queue  # Import here to avoid circular import problem.
    511     done = Queue(loop=loop)

~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in <setcomp>(.0)
    507         raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
    508     loop = loop if loop is not None else events.get_event_loop()
--> 509     todo = {ensure_future(f, loop=loop) for f in set(fs)}
    510     from .queues import Queue  # Import here to avoid circular import problem.
    511     done = Queue(loop=loop)

~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in ensure_future(coro_or_future, loop)
    590         return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    591     else:
--> 592         raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
    593                         'required')
    594

TypeError: An asyncio.Future, a coroutine or an awaitable is required

For this case, asynced provides the same abstraction as threaded.

[8]:
fetch = asynced(httpx.Client().get)
for resp in fetch.map(urls, as_completed=True):
    print(resp.url)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-8-ee137be73eb3> in <module>
      1 fetch = asynced(httpx.Client().get)
----> 2 for resp in fetch.map(urls, as_completed=True):
      3     print(resp.url)

~/checkouts/readthedocs.org/user_builds/futured/checkouts/latest/futured.py in map(self, *iterables, **kwargs)
     49         :param kwargs: keyword options for :meth:`results`
     50         """
---> 51         return self.results(map(self, *iterables), **kwargs)
     52
     53     def starmap(self, iterable: Iterable, **kwargs) -> Iterator:

~/checkouts/readthedocs.org/user_builds/futured/checkouts/latest/futured.py in results(cls, fs, loop, as_completed, **kwargs)
    150     def results(cls, fs: Iterable, *, loop=None, as_completed=False, **kwargs) -> Iterator:
    151         loop = loop or asyncio.get_event_loop()
--> 152         fs = [asyncio.ensure_future(future, loop=loop) for future in fs]
    153         if as_completed or kwargs:
    154             fs = asyncio.as_completed(fs, **kwargs)

~/checkouts/readthedocs.org/user_builds/futured/checkouts/latest/futured.py in <listcomp>(.0)
    150     def results(cls, fs: Iterable, *, loop=None, as_completed=False, **kwargs) -> Iterator:
    151         loop = loop or asyncio.get_event_loop()
--> 152         fs = [asyncio.ensure_future(future, loop=loop) for future in fs]
    153         if as_completed or kwargs:
    154             fs = asyncio.as_completed(fs, **kwargs)

~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in ensure_future(coro_or_future, loop)
    590         return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
    591     else:
--> 592         raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
    593                         'required')
    594

TypeError: An asyncio.Future, a coroutine or an awaitable is required