Documentation for pulsar 0.9.2. For development docs, go here.

Source code for pulsar.async.threads

import logging
import threading
import weakref
from functools import partial
from multiprocessing import dummy, current_process

try:
    import queue
except ImportError:  # pragma nocover
    import Queue as queue
ThreadQueue = queue.Queue
Empty = queue.Empty
Full = queue.Full

from .access import (asyncio, selectors, get_actor, set_actor,
                     events, thread_data, _StopError, BaseEventLoop,
                     get_event_loop, logger as get_logger)
from .futures import Future, Task, async, AsyncObject
from .consts import ACTOR_STATES


__all__ = ['Thread', 'IOqueue', 'ThreadPool',
           'ThreadQueue', 'Empty', 'Full']

_MAX_WORKERS = 50
_threads_queues = weakref.WeakKeyDictionary()


def get_executor(loop):
    executor = loop._default_executor
    if executor is None:
        executor = ThreadPool(loop=loop)
        loop._default_executor = executor
    return executor


def run_in_executor(loop, executor, callback, *args):
    if isinstance(callback, events.Handle):
        assert not args
        assert not isinstance(callback, events.TimerHandle)
        if callback._cancelled:
            f = Future(loop=loop)
            f.set_result(None)
            return f
        callback, args = callback._callback, callback._args
    if executor is None:
        executor = get_executor(loop)
    return executor.submit(callback, *args)


class Thread(dummy.DummyProcess):
    _loop = None
    _pool_loop = None

    @property
    def pid(self):
        return current_process().pid

    def terminate(self):
        '''Invoke the stop on the event loop method.'''
        if self.is_alive() and self._loop:
            self._loop.call_soon_threadsafe(self._loop.stop)

    def set_loop(self, loop):
        assert self._loop is None
        self._loop = loop


class PoolThread(Thread):
    '''A thread for the :class;`.ThreadPool`.
    '''
    def __init__(self, pool):
        self.pool = pool
        self._pool_loop = pool._loop
        super(PoolThread, self).__init__(name=pool.worker_name)

    def __repr__(self):
        if self.ident:
            return '%s-%s' % (self.name, self.ident)
        else:
            return self.name
    __str__ = __repr__

    def run(self):
        '''Modified run method which set the actor and the event_loop.
        '''
        if self.pool._actor:
            set_actor(self.pool._actor)
        # The run method for the threads in this thread pool
        logger = logging.getLogger('pulsar.%s' % self.name)
        loop = QueueEventLoop(self.pool, logger=logger, iothreadloop=True)
        self.set_loop(loop)
        loop.run_forever()


[docs]class IOqueue(selectors.BaseSelector): '''A selector based on a distributed queue Since there is no way to my knowledge to wake up the queue while getting an item from the task queue, the timeout cannot be larger than a small number which by default is ``0.5`` seconds. ''' max_timeout = 0.5 def __init__(self, executor): super(IOqueue, self).__init__() self._actor = executor._actor self._work_queue = executor._work_queue self._maxtasks = executor._maxtasks self._received = 0 self._completed = 0 def select(self, timeout=None): if self._actor and self._actor.state > ACTOR_STATES.RUN: raise _StopError if self._maxtasks and self._received >= self._maxtasks: if self._completed < self._received: return () else: raise _StopError block = True if timeout is None: timeout = self.max_timeout elif timeout <= 0: timeout = 0 block = False else: timeout = min(self.max_timeout, timeout) try: task = self._work_queue.get(block=block, timeout=timeout) except (Empty, TypeError): return () except (EOFError, IOError): raise _StopError if task is None: # got the sentinel, exit! self._work_queue.put(None) raise _StopError return task def process_task(self, task, loop): self._received += 1 p, func, args, kwargs = task try: result = func(*args, **kwargs) except Exception as exc: self._done_task(p, None, exc) else: try: result = async(result, loop=loop) except TypeError: self._done_task(p, None, result) else: result.add_done_callback(partial(self._done_task, p)) def _done_task(self, p, future, result=None): self._completed += 1 # if future: try: result = future.result() except Exception as exc: result = exc # if isinstance(result, Exception): p._loop.call_soon_threadsafe(lambda: p.set_exception(result)) else: p._loop.call_soon_threadsafe(lambda: p.set_result(result)) def get_map(self): return {} def register(self, fileobj, events, data=None): pass def unregister(self, fileobj): pass
class ThreadSafeLoop(object): def __init__(self, iothreadloop): self._iothreadloop = iothreadloop if self._iothreadloop: self._original_call_soon = self.call_soon self.call_soon = self._threadsafe_call_soon asyncio.set_event_loop(self) def _threadsafe_call_soon(self, callback, *args): if self != get_event_loop(): return self.call_soon_threadsafe(callback, *args) else: return self._original_call_soon(callback, *args)
[docs]class QueueEventLoop(BaseEventLoop, ThreadSafeLoop): '''An :ref:`asyncio event loop <asyncio-event-loop>` which uses :class:`.IOqueue` as its selector. ''' def __init__(self, executor, iothreadloop=False, logger=None): super(QueueEventLoop, self).__init__() ThreadSafeLoop.__init__(self, iothreadloop) self._default_executor = executor self._selector = IOqueue(executor) self.logger = get_logger(logger=logger) def create_task(self, coro): return Task(coro, loop=self) def _write_to_self(self): pass def _process_events(self, task): if task: self._selector.process_task(task, self) def run_in_executor(self, executor, callback, *args): return run_in_executor(self, executor, callback, *args)
[docs]class ThreadPool(AsyncObject): '''A thread pool for an actor. This pool maintains a group of threads to perform asynchronous tasks via the :meth:`submit` method. ''' worker_name = 'exec' def __init__(self, max_workers=None, actor=None, loop=None, maxtasks=None): self._actor = actor = actor or get_actor() if actor: loop = loop or actor._loop if not max_workers: max_workers = actor.cfg.thread_workers self.worker_name = '%s.%s' % (actor.name, self.worker_name) self._loop = loop or get_event_loop() self._max_workers = min(max_workers or _MAX_WORKERS, _MAX_WORKERS) self._threads = set() self._maxtasks = maxtasks self._work_queue = ThreadQueue() self._shutdown = False self._shutdown_lock = threading.Lock()
[docs] def submit(self, func, *args, **kwargs): '''Equivalent to ``func(*args, **kwargs)``. This method create a new task for function ``func`` and adds it to the queue. Return a :class:`~asyncio.Future` called back once the task has finished. ''' with self._shutdown_lock: if self._shutdown: raise RuntimeError( 'cannot schedule new futures after shutdown') future = Future(loop=self._loop) self._work_queue.put((future, func, args, kwargs)) self._adjust_thread_count() return future
def shutdown(self, wait=True): with self._shutdown_lock: self._shutdown = True self._work_queue.put(None) if wait: for t in self._threads: t.join() def _adjust_thread_count(self): if len(self._threads) < self._max_workers: t = PoolThread(self) t.daemon = True t.start() self._threads.add(t) _threads_queues[t] = self._work_queue