Examples

Fetching urls concurrently, and processing the responses as completed, is used as an example. This simple task is nonetheless suprisingly tedious, especially using asyncio.

[2]:
# only needed for notebook
import nest_asyncio
nest_asyncio.apply()

urls = [f'http://httpbin.org/delay/{d}' for d in (0.2, 0.1, 0.0)]

Threaded

[3]:
from concurrent import futures
import requests

def fetch_all(urls):
    with requests.Session() as session, futures.ThreadPoolExecutor() as executor:
        fs = [executor.submit(session.get, url) for url in urls]
        for future in futures.as_completed(fs):
            yield future.result()

for resp in fetch_all(urls):
    print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2

futured.threaded abstracts away the boilerplate.

[4]:
from futured import threaded

fetch = threaded(requests.Session().get)
for resp in fetch.map(urls, as_completed=True):
    print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2

Asynced

[5]:
import asyncio
import httpx

async def fetch_all(urls):
    client = httpx.Client()
    for future in asyncio.as_completed(map(client.get, urls)):
        yield await future

for resp in fetch_all(urls):
    print(resp.url)
---------------------------------------------------------------------------
TypeError                                 Traceback (most recent call last)
<ipython-input-5-6d89a3ce2c7a> in <module>
      7         yield await future
      8
----> 9 for resp in fetch_all(urls):
     10     print(resp.url)

TypeError: 'async_generator' object is not iterable

The problem is coroutines support the yield keyword, but only to create async iterators. Even though asyncio.as_completed is itself a normal iterator, there is no way to write this generator as intended. Additionally there is no iterator equivalent of loop.run_until_complete, to mitigate the viral nature of the async keyword.

So futured.asynced provides one.

[6]:
from futured import asynced

for resp in asynced.run(fetch_all, urls):
    print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2

The alternative approach is to explicitly handle the loop in the implementation.

[7]:
def fetch_all(urls):
    loop = asyncio.get_event_loop()
    client = httpx.Client()
    for future in asyncio.as_completed(map(client.get, urls)):
        yield loop.run_until_complete(future)

for resp in fetch_all(urls):
    print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2

For this case, asynced provides the same abstraction as threaded.

[8]:
fetch = asynced(httpx.Client().get)
for resp in fetch.map(urls, as_completed=True):
    print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2