Documentation for pulsar 0.9.2. For development docs, go here.

Source code for pulsar.apps.greenio.pool

import threading
from collections import deque

import greenlet
from greenlet import getcurrent

from pulsar import isfuture, Future, From, get_io_loop, AsyncObject, task


_DEFAULT_WORKERS = 100
_MAX_WORKERS = 1000


class GreenletWorker(greenlet.greenlet):
    pass


[docs]class GreenPool(AsyncObject): '''A pool of running greenlets. This pool maintains a group of greenlets to perform asynchronous tasks via the :meth:`submit` method. ''' worker_name = 'exec' def __init__(self, max_workers=None, loop=None, maxtasks=None): self._loop = get_io_loop(loop) self._max_workers = min(max_workers or _DEFAULT_WORKERS, _MAX_WORKERS) self._greenlets = set() self._available = set() self._maxtasks = maxtasks self._queue = deque() self._shutdown = False self._shutdown_lock = threading.Lock()
[docs] def submit(self, func, *args, **kwargs): '''Equivalent to ``func(*args, **kwargs)``. This method create a new task for function ``func`` and adds it to the queue. Return a :class:`~asyncio.Future` called back once the task has finished. ''' with self._shutdown_lock: if self._shutdown: raise RuntimeError( 'cannot schedule new futures after shutdown') future = Future(loop=self._loop) self._loop.call_soon_threadsafe( self._put, (future, func, args, kwargs)) return future
def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True self._loop.call_soon_threadsafe(self._put) # INTERNALS def _adjust_greenlet_count(self): if len(self._greenlets) < self._max_workers: greenlet = GreenletWorker(self._green_run) self._greenlets.add(greenlet) greenlet.switch() def _check_queue(self): # Run in the main greenlet of the evnet-loop thread try: task = self._queue.pop() except IndexError: return if self._available: self._green_task(task) @task def _green_task(self, task): # Run in the main greenlet of the evnet-loop thread greenlet = self._available.pop() result = greenlet.switch(task) while isfuture(result): result = greenlet.switch((yield From(result))) def _put(self, task=None): # Run in the main greenlet of the evnet-loop thread if task: self._adjust_greenlet_count() self._queue.appendleft(task) self._check_queue() def _green_run(self): # The run method of a worker greenlet current = getcurrent() parent = current.parent assert parent task = True while task: self._available.add(current) self._loop.call_soon(self._check_queue) task = parent.switch() # switch back to the main execution if task: future, func, args, kwargs = task try: result = func(*args, **kwargs) except Exception as exc: future.set_exception(exc) else: future.set_result(result) else: self._greenlets.pop(current) self._put(None)
[docs]class RunInPool: '''Utility for running a callable in a :class:`.GreenPool`. :param app: the callable to run on greenlet workers :param max_workers=100: maximum number of workers :param loop: optional event loop THis utility is used by the :mod:`~pulsar.apps.pulse` application. ''' def __init__(self, app, max_workers=None, loop=None): self.pool = GreenPool(max_workers=max_workers, loop=loop) self.app = app def __call__(self, *args, **kwargs): return self.pool.submit(self.app, *args, **kwargs)