import asyncio
import contextlib
import itertools
import operator
import os
import queue
import subprocess
import types
from concurrent import futures
from functools import partial
from typing import AnyStr, AsyncIterable, Callable, Iterable, Iterator, MutableSet, Sequence
__version__ = '1.1'
[docs]class futured(partial):
"""A partial function which returns futures."""
as_completed = wait = NotImplemented
def __get__(self, instance, owner):
return self if instance is None else types.MethodType(self, instance)
[docs] @classmethod
def results(cls, fs: Iterable, *, as_completed=False, **kwargs) -> Iterator:
"""Generate results concurrently from futures, by default in order.
:param fs: iterable of futures
:param as_completed, kwargs: generate results as completed with options, e.g., timeout
"""
fs = list(fs) # ensure futures are executing
if as_completed or kwargs:
fs = cls.as_completed(fs, **kwargs)
return map(operator.methodcaller('result'), fs)
[docs] @classmethod
def items(cls, iterable: Iterable, **kwargs) -> Iterator:
"""Generate key, result pairs as completed from futures.
:param iterable: key, future pairs
:param kwargs: as completed options, e.g., timeout
"""
keys = dict(map(reversed, iterable)) # type: ignore
return ((keys[future], future.result()) for future in cls.as_completed(keys, **kwargs))
[docs] def map(self, *iterables: Iterable, **kwargs) -> Iterator:
"""Asynchronously map function.
:param kwargs: keyword options for :meth:`results`
"""
return self.results(map(self, *iterables), **kwargs)
[docs] def starmap(self, iterable: Iterable, **kwargs) -> Iterator:
"""Asynchronously starmap function.
:param kwargs: keyword options for :meth:`results`
"""
return self.results(itertools.starmap(self, iterable), **kwargs)
[docs] def mapzip(self, iterable: Iterable, **kwargs) -> Iterator:
"""Generate arg, result pairs as completed.
:param kwargs: keyword options for :meth:`items`
"""
return self.items(((arg, self(arg)) for arg in iterable), **kwargs)
[docs] @classmethod
@contextlib.contextmanager
def waiting(cls, *fs, **kwargs):
"""Return context manager which waits on :meth:`results`."""
fs = list(fs)
try:
yield fs
finally:
fs[:] = cls.results(fs, **kwargs)
[docs] @classmethod
def stream(cls, fs: MutableSet, **kwargs) -> Iterator:
"""Generate futures as completed from a mutable set.
Iteration will consume futures from the set, and it can be updated while in use.
"""
while fs:
done, pending = cls.wait(fs, return_when='FIRST_COMPLETED', **kwargs)
fs.__init__(pending) # type: ignore
yield from done
task = partial.__call__
[docs] def streamzip(self, queue: Sequence, **kwargs) -> Iterator:
"""Generate arg, future pairs as completed from mapping function.
The queue can be extended while in use.
"""
pool, start = {}, 0 # type: ignore
while pool or queue[start:]:
pool.update({self.task(arg, **kwargs): arg for arg in queue[start:]})
start = len(queue)
done, _ = type(self).wait(pool, return_when='FIRST_COMPLETED', **kwargs)
for future in done:
yield pool.pop(future), future
class executed(futured):
"""Extensible base class for callables which require a ``submit`` method."""
as_completed = futures.as_completed
wait = futures.wait
Executor = NotImplemented
def __new__(cls, *args, **kwargs):
if args:
return futured.__new__(cls, cls.Executor().submit, *args, **kwargs)
return partial(futured.__new__, cls, cls.Executor(**kwargs).submit)
def __enter__(self):
return self
def __exit__(self, *args):
self.func.__self__.__exit__(*args)
[docs]class threaded(executed):
"""A partial function executed in its own thread pool."""
Executor = futures.ThreadPoolExecutor
[docs]class processed(executed):
"""A partial function executed in its own process pool."""
Executor = futures.ProcessPoolExecutor
with contextlib.suppress(ImportError):
class distributed(executed):
"""A partial function executed by a dask distributed client."""
from distributed import as_completed, wait, Client as Executor # type: ignore
[docs]class asynced(futured):
"""A partial coroutine.
Anywhere futures are expected, coroutines are also supported.
"""
[docs] @classmethod
def results(cls, fs: Iterable, *, loop=None, as_completed=False, **kwargs) -> Iterator:
loop = loop or asyncio.get_event_loop()
fs = [asyncio.ensure_future(future, loop=loop) for future in fs]
if as_completed or kwargs:
fs = asyncio.as_completed(fs, **kwargs)
return map(loop.run_until_complete, fs)
@staticmethod
async def pair(key, future):
return key, await future
[docs] @classmethod
def items(cls, iterable: Iterable, **kwargs) -> Iterator:
return cls.results(itertools.starmap(cls.pair, iterable), as_completed=True, **kwargs)
[docs] def run(self: Callable, *args, **kwargs):
"""Synchronously call and run coroutine or asynchronous iterator."""
coro = self(*args, **kwargs)
if isinstance(coro, AsyncIterable):
return looped(coro)
return asyncio.get_event_loop().run_until_complete(coro)
@classmethod
def wait(cls, fs: Iterable, **kwargs) -> tuple:
return cls.run(asyncio.wait, fs, **kwargs)
def task(self, arg, **kwargs):
return asyncio.ensure_future(self(arg), **kwargs)
[docs]class looped:
"""Wrap an asynchronous iterable into an iterator.
Analogous to loop.run_until_complete for coroutines.
"""
def __init__(self, aiterable: AsyncIterable, *, loop=None):
self.anext = aiterable.__aiter__().__anext__
self.loop = loop or asyncio.get_event_loop()
self.future = asyncio.ensure_future(self.anext(), loop=self.loop)
def __del__(self): # suppress warning
self.future.cancel() # pragma: no cover
def __iter__(self):
return self
def __next__(self):
try:
result = self.loop.run_until_complete(self.future)
except StopAsyncIteration:
raise StopIteration
self.future = asyncio.ensure_future(self.anext(), loop=self.loop)
return result
[docs]class command(subprocess.Popen):
"""Asynchronous subprocess with a future compatible interface."""
[docs] def __init__(self, *args, **kwargs):
super().__init__(args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
def check(self, args, stdout, stderr):
if self.returncode:
raise subprocess.CalledProcessError(self.returncode, args, stdout, stderr)
return stdout
[docs] @classmethod
async def coroutine(cls, *args, shell=False, **kwargs):
"""Create a subprocess coroutine, suitable for timeouts."""
create = asyncio.create_subprocess_shell if shell else asyncio.create_subprocess_exec
self = await create(*args, stdout=subprocess.PIPE, stderr=subprocess.PIPE, **kwargs)
return cls.check(self, args, *(await self.communicate()))
[docs] def result(self, **kwargs) -> AnyStr:
"""Return stdout or raise stderr."""
return self.check(self.args, *self.communicate(**kwargs))
[docs] def pipe(self, *args, **kwargs) -> 'command':
"""Pipe stdout to the next command's stdin."""
return type(self)(*args, stdin=self.stdout, **kwargs)
[docs] def __or__(self, other: Iterable) -> 'command':
"""Alias of :meth:`pipe`."""
return self.pipe(*other)
[docs] def __iter__(self):
"""Return output lines."""
return iter(self.result().splitlines())
class Results(queue.Queue):
def put(self, pid, value):
pid, status = os.waitpid(pid, 0)
super().put((status, value))
def get(self):
status, value = super().get()
if status:
raise OSError(status, value)
return not status
[docs]def forked(values: Iterable, max_workers: int = None) -> Iterator:
"""Generate each value in its own child process and wait in the parent."""
max_workers = max_workers or os.cpu_count() or 1 # same default as ProcessPoolExecutor
workers, results = 0, Results()
task = threaded(max_workers=max_workers)(results.put)
for value in values:
while workers >= max_workers:
workers -= results.get()
pid = os.fork()
if pid:
workers += bool(task(pid, value))
else: # pragma: no cover
yield value
os._exit(0)
while workers:
workers -= results.get()
[docs]def decorated(base: type, **decorators: Callable) -> type:
"""Return subclass with decorated methods."""
namespace = {name: decorators[name](getattr(base, name)) for name in decorators}
return type(base.__name__, (base,), namespace)