Examples¶
Fetching urls concurrently, and processing the responses as completed, is used as an example. This simple task is nonetheless suprisingly tedious, especially using asyncio
.
[2]:
# only needed for notebook
import nest_asyncio
nest_asyncio.apply()
urls = [f'http://httpbin.org/delay/{d}' for d in (0.2, 0.1, 0.0)]
Threaded¶
[3]:
from concurrent import futures
import requests
def fetch_all(urls):
with requests.Session() as session, futures.ThreadPoolExecutor() as executor:
fs = [executor.submit(session.get, url) for url in urls]
for future in futures.as_completed(fs):
yield future.result()
for resp in fetch_all(urls):
print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2
futured.threaded
abstracts away the boilerplate.
[4]:
from futured import threaded
fetch = threaded(requests.Session().get)
for resp in fetch.map(urls, as_completed=True):
print(resp.url)
http://httpbin.org/delay/0.0
http://httpbin.org/delay/0.1
http://httpbin.org/delay/0.2
Asynced¶
[5]:
import asyncio
import httpx
async def fetch_all(urls):
client = httpx.Client()
for future in asyncio.as_completed(map(client.get, urls)):
yield await future
for resp in fetch_all(urls):
print(resp.url)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-5-6d89a3ce2c7a> in <module>
7 yield await future
8
----> 9 for resp in fetch_all(urls):
10 print(resp.url)
TypeError: 'async_generator' object is not iterable
The problem is coroutines support the yield
keyword, but only to create async iterators. Even though asyncio.as_completed
is itself a normal iterator, there is no way to write this generator as intended. Additionally there is no iterator equivalent of loop.run_until_complete
, to mitigate the viral nature of the async
keyword.
So futured.asynced
provides one.
[6]:
from futured import asynced
for resp in asynced.run(fetch_all, urls):
print(resp.url)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-6-1b7c3bc88fc0> in <module>
1 from futured import asynced
2
----> 3 for resp in asynced.run(fetch_all, urls):
4 print(resp.url)
~/checkouts/readthedocs.org/user_builds/futured/checkouts/latest/futured.py in __next__(self)
197 def __next__(self):
198 try:
--> 199 result = self.loop.run_until_complete(self.future)
200 except StopAsyncIteration:
201 raise StopIteration
~/checkouts/readthedocs.org/user_builds/futured/envs/latest/lib/python3.7/site-packages/nest_asyncio.py in run_until_complete(self, future)
57 if not f.done():
58 raise RuntimeError('Event loop stopped before Future completed.')
---> 59 return f.result()
60
61 def _run_once(self):
~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/futures.py in result(self)
176 self.__log_traceback = False
177 if self._exception is not None:
--> 178 raise self._exception
179 return self._result
180
~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in __step(***failed resolving arguments***)
221 # We use the `send` method directly, because coroutines
222 # don't have `__iter__` and `__next__` methods.
--> 223 result = coro.send(None)
224 else:
225 result = coro.throw(exc)
<ipython-input-5-6d89a3ce2c7a> in fetch_all(urls)
4 async def fetch_all(urls):
5 client = httpx.Client()
----> 6 for future in asyncio.as_completed(map(client.get, urls)):
7 yield await future
8
~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in as_completed(fs, loop, timeout)
507 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
508 loop = loop if loop is not None else events.get_event_loop()
--> 509 todo = {ensure_future(f, loop=loop) for f in set(fs)}
510 from .queues import Queue # Import here to avoid circular import problem.
511 done = Queue(loop=loop)
~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in <setcomp>(.0)
507 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
508 loop = loop if loop is not None else events.get_event_loop()
--> 509 todo = {ensure_future(f, loop=loop) for f in set(fs)}
510 from .queues import Queue # Import here to avoid circular import problem.
511 done = Queue(loop=loop)
~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in ensure_future(coro_or_future, loop)
590 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
591 else:
--> 592 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
593 'required')
594
TypeError: An asyncio.Future, a coroutine or an awaitable is required
The alternative approach is to explicitly handle the loop in the implementation.
[7]:
def fetch_all(urls):
loop = asyncio.get_event_loop()
client = httpx.Client()
for future in asyncio.as_completed(map(client.get, urls)):
yield loop.run_until_complete(future)
for resp in fetch_all(urls):
print(resp.url)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-7-10078f038fe1> in <module>
5 yield loop.run_until_complete(future)
6
----> 7 for resp in fetch_all(urls):
8 print(resp.url)
<ipython-input-7-10078f038fe1> in fetch_all(urls)
2 loop = asyncio.get_event_loop()
3 client = httpx.Client()
----> 4 for future in asyncio.as_completed(map(client.get, urls)):
5 yield loop.run_until_complete(future)
6
~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in as_completed(fs, loop, timeout)
507 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
508 loop = loop if loop is not None else events.get_event_loop()
--> 509 todo = {ensure_future(f, loop=loop) for f in set(fs)}
510 from .queues import Queue # Import here to avoid circular import problem.
511 done = Queue(loop=loop)
~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in <setcomp>(.0)
507 raise TypeError(f"expect a list of futures, not {type(fs).__name__}")
508 loop = loop if loop is not None else events.get_event_loop()
--> 509 todo = {ensure_future(f, loop=loop) for f in set(fs)}
510 from .queues import Queue # Import here to avoid circular import problem.
511 done = Queue(loop=loop)
~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in ensure_future(coro_or_future, loop)
590 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
591 else:
--> 592 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
593 'required')
594
TypeError: An asyncio.Future, a coroutine or an awaitable is required
For this case, asynced
provides the same abstraction as threaded
.
[8]:
fetch = asynced(httpx.Client().get)
for resp in fetch.map(urls, as_completed=True):
print(resp.url)
---------------------------------------------------------------------------
TypeError Traceback (most recent call last)
<ipython-input-8-ee137be73eb3> in <module>
1 fetch = asynced(httpx.Client().get)
----> 2 for resp in fetch.map(urls, as_completed=True):
3 print(resp.url)
~/checkouts/readthedocs.org/user_builds/futured/checkouts/latest/futured.py in map(self, *iterables, **kwargs)
49 :param kwargs: keyword options for :meth:`results`
50 """
---> 51 return self.results(map(self, *iterables), **kwargs)
52
53 def starmap(self, iterable: Iterable, **kwargs) -> Iterator:
~/checkouts/readthedocs.org/user_builds/futured/checkouts/latest/futured.py in results(cls, fs, loop, as_completed, **kwargs)
150 def results(cls, fs: Iterable, *, loop=None, as_completed=False, **kwargs) -> Iterator:
151 loop = loop or asyncio.get_event_loop()
--> 152 fs = [asyncio.ensure_future(future, loop=loop) for future in fs]
153 if as_completed or kwargs:
154 fs = asyncio.as_completed(fs, **kwargs)
~/checkouts/readthedocs.org/user_builds/futured/checkouts/latest/futured.py in <listcomp>(.0)
150 def results(cls, fs: Iterable, *, loop=None, as_completed=False, **kwargs) -> Iterator:
151 loop = loop or asyncio.get_event_loop()
--> 152 fs = [asyncio.ensure_future(future, loop=loop) for future in fs]
153 if as_completed or kwargs:
154 fs = asyncio.as_completed(fs, **kwargs)
~/.pyenv/versions/3.7.3/lib/python3.7/asyncio/tasks.py in ensure_future(coro_or_future, loop)
590 return ensure_future(_wrap_awaitable(coro_or_future), loop=loop)
591 else:
--> 592 raise TypeError('An asyncio.Future, a coroutine or an awaitable is '
593 'required')
594
TypeError: An asyncio.Future, a coroutine or an awaitable is required