Examples¶
Fetching urls concurrently, and processing the responses as completed, is used as an example. This simple task is nonetheless suprisingly tedious, especially using asyncio
.
[2]:
# only needed for notebook
import nest_asyncio
nest_asyncio.apply()
urls = [f'http://httpbin.org/delay/{d}' for d in (0.2, 0.1, 0.0)]
Threaded¶
[3]:
from concurrent import futures
import requests
def fetch_all(urls):
with requests.Session() as session, futures.ThreadPoolExecutor() as executor:
fs = [executor.submit(session.get, url) for url in urls]
for future in futures.as_completed(fs):
yield future.result()
for resp in fetch_all(urls):
print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2
futured.threaded
abstracts away the boilerplate.
[4]:
from futured import threaded
fetch = threaded(requests.Session().get)
for resp in fetch.map(urls, as_completed=True):
print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2
Asynced¶
[5]:
import asyncio
import httpx
async def fetch_all(urls):
client = httpx.Client()
for future in asyncio.as_completed(map(client.get, urls)):
yield await future
for resp in fetch_all(urls):
print(resp.url)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-5-6d89a3ce2c7a> in <module>
7 yield await future
8
----> 9 for resp in fetch_all(urls):
10 print(resp.url)
TypeError: 'async_generator' object is not iterable
The problem is coroutines support the yield
keyword, but only to create async iterators. Even though asyncio.as_completed
is itself a normal iterator, there is no way to write this generator as intended. Additionally there is no iterator equivalent of loop.run_until_complete
, to mitigate the viral nature of the async
keyword.
So futured.asynced
provides one.
[6]:
from futured import asynced
for resp in asynced.run(fetch_all, urls):
print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2
The alternative approach is to explicitly handle the loop in the implementation.
[7]:
def fetch_all(urls):
loop = asyncio.get_event_loop()
client = httpx.Client()
for future in asyncio.as_completed(map(client.get, urls)):
yield loop.run_until_complete(future)
for resp in fetch_all(urls):
print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2
For this case, asynced
provides the same abstraction as threaded
.
[8]:
fetch = asynced(httpx.Client().get)
for resp in fetch.map(urls, as_completed=True):
print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2