Documentation for pulsar 0.9.2. For development docs, go here.

Source code for pulsar.apps.tasks.backend

'''
The :class:`TaskBackend` is at the heart of the
:ref:`task queue application <apps-taskqueue>`. It exposes
all the functionalities for running new tasks, scheduling periodic tasks
and retrieving task information. Pulsar ships with two backends, one which uses
pulsar internals and store tasks in the arbiter domain and another which stores
tasks in redis_.

The backend is created by the :class:`.TaskQueue`
as soon as it starts. It is then passed to all task queue workers
which, in turns, invoke the :class:`TaskBackend.start` method
to start pulling tasks form the distributed task queue.

.. _task-state:

Task states
~~~~~~~~~~~~~

A :class:`Task` can have one of the following :attr:`~.Task.status` string:

* ``QUEUED = 6`` A task queued but not yet executed.
* ``STARTED = 5`` task where execution has started.
* ``RETRY = 4`` A task is retrying calculation.
* ``REVOKED = 3`` the task execution has been revoked (or timed-out).
* ``FAILURE = 2`` task execution has finished with failure.
* ``SUCCESS = 1`` task execution has finished with success.

.. _task-run-state:

**FULL_RUN_STATES**

The set of states for which a :class:`Task` has run:
``FAILURE`` and ``SUCCESS``

.. _task-ready-state:

**READY_STATES**

The set of states for which a :class:`Task` has finished:
``REVOKED``, ``FAILURE`` and ``SUCCESS``

.. _tasks-pubsub:

Task status broadcasting
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

A :class:`TaskBackend` broadcast :class:`Task` state into three different
channels via the a :meth:`~.Store.pubsub` handler.


Implementation
~~~~~~~~~~~~~~~~~~~
When creating a new :class:`TaskBackend` there are three methods which must
be implemented:

* The :meth:`~TaskBackend.get_task` method, invoked when retrieving
  a :class:`Task` from the backend server.
* The :meth:`~TaskBackend.maybe_queue_task` method, invoked when a new
  class:`.Task` is created and ready to be queued.
* The :meth:`~TaskBackend.finish_task` method, invoked when a
  :class:`.Task` reaches a :ref:`ready state <task-ready-state>`.

For example::

    from pulsar.apps import tasks

    class TaskBackend(tasks.TaskBackend):
        ...

Once the custom task backend is implemented it must be registered::

    tasks.task_backends['mybackend'] = TaskBackend

And the backend will be selected via::

    --task-backend mybackend://host:port

.. _redis: http://redis.io/
'''
import time
from functools import partial
from datetime import datetime, timedelta
from hashlib import sha1

from pulsar import (task, async, EventHandler, PulsarException, yield_from,
                    Future, coroutine_return, future_timeout,
                    ConnectionRefusedError, CANCELLED_ERRORS)
from pulsar.utils.pep import itervalues
from pulsar.utils.security import gen_unique_id
from pulsar.apps.data import odm
from pulsar.utils.log import lazyproperty, LazyString

from .models import JobRegistry
from . import states


__all__ = ['task_backends', 'Task', 'TaskBackend', 'TaskNotAvailable',
           'nice_task_message']

task_backends = {}


if hasattr(timedelta, "total_seconds"):
    timedelta_seconds = lambda delta: max(delta.total_seconds(), 0)

else:   # pragma    nocover
    def timedelta_seconds(delta):
        if delta.days < 0:
            return 0
        return delta.days * 86400 + delta.seconds + (delta.microseconds / 10e5)


def get_time(expiry, start):
    if isinstance(expiry, timedelta):
        return (start + 86400*expiry.days + expiry.seconds +
                0.000001*expiry.microseconds)
    else:
        return start + expiry


def format_time(dt):
    if isinstance(dt, (float, int)):
        dt = datetime.fromtimestamp(dt)
    return dt.isoformat() if dt else '?'


def nice_task_message(req, smart_time=None):
    smart_time = smart_time or format_time
    status = states.status_string(req.get('status'))
    user = req.get('user')
    ti = req.get('time_start', req.get('time_executed'))
    name = '%s (%s) ' % (req['name'], req['id'][:8])
    msg = '%s %s at %s' % (name, status, smart_time(ti))
    return '%s by %s' % (msg, user) if user else msg


class TaskNotAvailable(PulsarException):
    MESSAGE = 'Task {0} is not registered'

    def __init__(self, task_name):
        self.task_name = task_name
        super(TaskNotAvailable, self).__init__(self.MESSAGE.format(task_name))


class TaskTimeout(PulsarException):
    pass


[docs]class TaskConsumer(object): '''A context manager for consuming tasks. Instances of this consumer are created by the :class:`TaskBackend` when a task is executed. .. attribute:: task_id the :attr:`Task.id` being consumed. .. attribute:: job the :class:`.Job` which generated the task. .. attribute:: worker the :class:`.Actor` executing the task. .. attribute:: backend The :class:`.TaskBackend`. This is useful when creating tasks from within a :ref:`job callable <job-callable>`. ''' def __init__(self, backend, worker, task_id, job): self.logger = worker.logger self.backend = backend self.worker = worker self.job = job self.task_id = task_id
[docs]class Task(odm.Model): '''A data :class:`.Model` containing task execution data. ''' id = odm.CharField(primary_key=True) '''Task unique identifier. ''' lock_id = odm.CharField(required=False) name = odm.CharField(index=True) time_queued = odm.FloatField(default=time.time) time_started = odm.FloatField(required=False) time_ended = odm.FloatField(required=False) '''The timestamp indicating when this has finished. ''' expiry = odm.FloatField(required=False) '''The timestamp indicating when this task expires. If the task is not started before this value it is ``REVOKED``. ''' status = odm.IntegerField(index=True, default=states.QUEUED) '''flag indicating the :ref:`task status <task-state>` ''' kwargs = odm.PickleField() '''Key-valued parameters used by this task ''' result = odm.PickleField() '''Result as a json object '''
[docs] def done(self): '''Return ``True`` if the :class:`Task` has finshed. Its status is one of :ref:`READY_STATES <task-ready-state>`. ''' return self.get('status') in states.READY_STATES
[docs] def status_string(self): '''A string representation of :attr:`status` code ''' return states.status_string(self.get('status'))
def info(self): return 'task.%s(%s)' % (self.get('name'), self.get('id')) def lazy_info(self): return LazyString(self.info)
[docs]class TaskBackend(EventHandler): '''A backend class for running :class:`.Task`. A :class:`TaskBackend` is responsible for creating tasks and put them into the distributed queue. It also schedules the run of periodic tasks if enabled to do so. .. attribute:: task_paths List of paths where to upload :ref:`jobs <app-taskqueue-job>` which are factory of tasks. Passed by the task-queue application :ref:`task paths setting <setting-task_paths>`. .. attribute:: schedule_periodic ``True`` if this :class:`TaskBackend` can schedule periodic tasks. Passed by the task-queue application :ref:`schedule-periodic setting <setting-schedule_periodic>`. .. attribute:: backlog The maximum number of concurrent tasks running on a task-queue for an :class:`.Actor`. A number in the order of 5 to 10 is normally used. Passed by the task-queue application :ref:`concurrent tasks setting <setting-concurrent_tasks>`. .. attribute:: max_tasks The maximum number of tasks a worker will process before restarting. Passed by the task-queue application :ref:`max requests setting <setting-max_requests>`. .. attribute:: poll_timeout The (asynchronous) timeout for polling tasks from the task queue. It is always a positive number and it can be specified via the backend connection string:: local://?poll_timeout=3 There shouldn't be any reason to modify the default value. Default: ``2``. .. attribute:: processed The number of tasks processed (so far) by the worker running this backend. This value is important in connection with the :attr:`max_tasks` attribute. ''' task_poller = None def __init__(self, store, logger=None, task_paths=None, schedule_periodic=False, backlog=1, max_tasks=0, name=None, poll_timeout=None): super(TaskBackend, self).__init__(store._loop, many_times_events=('task_queued', 'task_started', 'task_done')) self.store = store self._logger = logger self.name = name self.task_paths = task_paths self.backlog = backlog self.max_tasks = max_tasks self.poll_timeout = max(poll_timeout or 0, 2) self.concurrent_tasks = set() self.processed = 0 self.schedule_periodic = schedule_periodic self.next_run = time.time() self.callbacks = {} self.models = odm.Mapper(self.store) self.models.register(Task) self._pubsub = self.get_pubsub() def __repr__(self): if self.schedule_periodic: return 'task scheduler %s' % self.store.dns else: return 'task consumer %s' % self.store.dns __str__ = __repr__ @property def num_concurrent_tasks(self): '''The number of :attr:`concurrent_tasks`. This number is never greater than the :attr:`backlog` attribute. ''' return len(self.concurrent_tasks) @lazyproperty def entries(self): return self._setup_schedule() @lazyproperty def registry(self): '''The :class:`.JobRegistry` for this backend. ''' return JobRegistry.load(self.task_paths)
[docs] def channel(self, name): '''Given an event ``name`` returns the corresponding channel name. The event ``name`` is one of ``task_queued``, ``task_started`` or ``task_done`` ''' return '%s_%s' % (self.name, name)
def event_name(self, channel): return channel[len(self.name)+1:] @task
[docs] def queue_task(self, jobname, meta_params=None, expiry=None, **kwargs): '''Try to queue a new :ref:`Task`. This method returns a :class:`.Future` which results in the task ``id`` created. If ``jobname`` is not a valid :attr:`.Job.name`, a ``TaskNotAvailable`` exception occurs. :param jobname: the name of a :class:`.Job` registered with the :class:`.TaskQueue` application. :param meta_params: Additional parameters to be passed to the :class:`Task` constructor (not its callable function). :param expiry: optional expiry timestamp to override the default expiry of a task. :param kwargs: optional dictionary used for the key-valued arguments in the task callable. :return: a :class:`.Future` resulting in a task id on success. ''' pubsub = self._pubsub if jobname in self.registry: job = self.registry[jobname] task_id, lock_id = self.generate_task_ids(job, kwargs) queued = time.time() if expiry is not None: expiry = get_time(expiry, queued) elif job.timeout: expiry = get_time(job.timeout, queued) meta_params = meta_params or {} task = self.models.task(id=task_id, lock_id=lock_id, name=job.name, time_queued=queued, expiry=expiry, kwargs=kwargs, status=states.QUEUED) if meta_params: task.update(meta_params) task = yield self.maybe_queue_task(task) if task: pubsub.publish(self.channel('task_queued'), task['id']) scheduled = self.entries.get(job.name) if scheduled: scheduled.next() self.logger.debug('queued %s', task.lazy_info()) coroutine_return(task['id']) else: self.logger.debug('%s cannot queue new task. Locked', jobname) coroutine_return() else: raise TaskNotAvailable(jobname)
[docs] def wait_for_task(self, task_id, timeout=None): '''Asynchronously wait for a task with ``task_id`` to have finished its execution. ''' # This coroutine is run on the worker event loop def _(task_id): task = yield self.get_task(task_id) if task: task_id = task['id'] callbacks = self.callbacks if task.done(): # task done, simply return it done = callbacks.pop(task_id, None) if done: done.set_result(task_id) else: done = callbacks.get(task_id) if not done: # No future, create one callbacks[task_id] = done = Future(loop=self._loop) yield done task = yield self.get_task(task_id) coroutine_return(task) fut = async(yield_from(_(task_id)), loop=self._loop) return future_timeout(fut, timeout) if timeout else fut
def get_tasks(self, ids): return self.models.task.filter(id=ids).all()
[docs] def get_pubsub(self): '''Create a publish/subscribe handler from the backend :attr:`store`. ''' pubsub = self.store.pubsub() pubsub.add_client(self) # pubsub channels names from event names channels = tuple((self.channel(name) for name in self.events)) pubsub.subscribe(*channels) self.bind_event('task_done', self._task_done_callback) return pubsub
# ####################################################################### # # ABSTRACT METHODS # #######################################################################
[docs] def maybe_queue_task(self, task): '''Actually queue a :class:`.Task` if possible. ''' raise NotImplementedError
[docs] def get_task(self, task_id=None): '''Asynchronously retrieve a :class:`Task` from a ``task_id``. :param task_id: the ``id`` of the task to retrieve. :return: a :class:`Task` or ``None``. ''' raise NotImplementedError
[docs] def finish_task(self, task_id, lock_id): '''Invoked at the end of task execution. The :class:`.Task` with ``task_id`` has been executed (either successfully or not) or has been revoked. This method perform backend specific operations. Must be implemented by subclasses. ''' raise NotImplementedError
[docs] def flush(self): '''Remove all queued :class:`.Task` ''' raise NotImplementedError()
# ####################################################################### # # START/CLOSE METHODS FOR TASK WORKERS # #######################################################################
[docs] def start(self, worker): '''Invoked by the task queue ``worker`` when it starts. ''' self._may_pool_task(worker) self.logger.debug('started polling tasks')
[docs] def close(self): '''Close this :class:`TaskBackend`. Invoked by the :class:`.Actor` when stopping. ''' task = self.task_poller if task: task.cancel() self.task_poller = None self._pubsub.close() return task
[docs] def generate_task_ids(self, job, kwargs): '''An internal method to generate task unique identifiers. :parameter job: The :class:`.Job` creating the task. :parameter kwargs: dictionary of key-valued parameters passed to the :ref:`job callable <job-callable>` method. :return: a two-elements tuple containing the unique id and an identifier for overlapping tasks if the :attr:`.Job.can_overlap` results in ``False``. Called by the :ref:`TaskBackend <apps-taskqueue-backend>` when creating a new task. ''' can_overlap = job.can_overlap if hasattr(can_overlap, '__call__'): can_overlap = can_overlap(**kwargs) tid = gen_unique_id() if can_overlap: return tid, None else: if kwargs: kw = ('%s=%s' % (k, kwargs[k]) for k in sorted(kwargs)) name = '%s %s' % (self.name, ', '.join(kw)) else: name = self.name return tid, sha1(name.encode('utf-8')).hexdigest()
# ####################################################################### # # PRIVATE METHODS # ####################################################################### def tick(self, now=None): # Run a tick, that is one iteration of the scheduler. if not self.schedule_periodic: return remaining_times = [] for entry in itervalues(self.entries): is_due, next_time_to_run = entry.is_due(now=now) if is_due: self.queue_task(entry.name) if next_time_to_run: remaining_times.append(next_time_to_run) self.next_run = now or time.time() if remaining_times: self.next_run += min(remaining_times) def job_list(self, jobnames=None): registry = self.registry jobnames = jobnames or registry all = [] for name in jobnames: if name not in registry: continue job = registry[name] can_overlap = job.can_overlap if hasattr(can_overlap, '__call__'): can_overlap = 'maybe' d = {'doc': job.__doc__, 'doc_syntax': job.doc_syntax, 'type': job.type, 'can_overlap': can_overlap} if self.entries and name in self.entries: entry = self.entries[name] _, next_time_to_run = self.next_scheduled((name,)) run_every = 86400*job.run_every.days + job.run_every.seconds d.update({'next_run': next_time_to_run, 'run_every': run_every, 'runs_count': entry.total_run_count}) all.append((name, d)) return all def next_scheduled(self, jobnames=None): if not self.schedule_periodic: return if jobnames: entries = (self.entries.get(name, None) for name in jobnames) else: entries = itervalues(self.entries) next_entry = None next_time = None for entry in entries: if entry is None: continue is_due, next_time_to_run = entry.is_due() if is_due: next_time = 0 next_entry = entry break elif next_time_to_run is not None: if next_time is None or next_time_to_run < next_time: next_time = next_time_to_run next_entry = entry if next_entry: return (next_entry.name, max(next_time, 0)) else: return (jobnames, None) @task def may_pool_task(self, worker): # Called in the ``worker`` event loop. # # It pools a new task if possible, and add it to the queue of # tasks consumed by the ``worker`` CPU-bound thread.''' next_time = 0 if worker.is_running(): executor = worker.executor() if self.num_concurrent_tasks < self.backlog: if self.max_tasks and self.processed >= self.max_tasks: if not self.num_concurrent_tasks: self.logger.warning('Processed %s tasks. Restarting.', self.processed) worker._loop.stop() coroutine_return() else: try: task = yield self.get_task() except ConnectionRefusedError: if worker.is_running(): raise else: coroutine_return() except CANCELLED_ERRORS: self.logger.debug('stopped polling tasks') raise if task: # Got a new task self.processed += 1 self.concurrent_tasks.add(task['id']) coro = self._execute_task(worker, task) executor.submit(yield_from, coro) else: self.logger.debug('%s concurrent requests. Cannot poll.', self.num_concurrent_tasks) next_time = 1 self.task_poller = None worker._loop.call_later(next_time, self._may_pool_task, worker) def _may_pool_task(self, worker): assert self.task_poller is None if worker.is_running(): self.task_poller = self.may_pool_task(worker) else: worker._loop.call_soon(self._may_pool_task, worker) def _execute_task(self, worker, task): # Asynchronous execution of a Task. This method is called # on a separate thread of execution from the worker event loop thread. logger = worker.logger pubsub = self._pubsub task_id = task.id lock_id = task.get('lock_id') time_ended = time.time() job = self.registry.get(task.get('name')) consumer = TaskConsumer(self, worker, task_id, job) task_info = task.lazy_info() try: if not consumer.job: raise RuntimeError('%s not in registry' % task_info) if task['status'] > states.STARTED: expiry = task.get('expiry') if expiry and time_ended > expiry: raise TaskTimeout else: logger.info('starting %s', task_info) kwargs = task.get('kwargs') or {} self.models.task.update(task, status=states.STARTED, time_started=time_ended, worker=worker.aid) pubsub.publish(self.channel('task_started'), task_id) # This may block for a while result = yield job(consumer, **kwargs) status = states.SUCCESS else: logger.error('invalid status for %s', task_info) self.concurrent_tasks.discard(task_id) coroutine_return(task_id) except TaskTimeout: logger.warning('%s timed-out', task_info) result = None status = states.REVOKED except Exception as exc: logger.exception('failure in %s', task_info) result = str(exc) status = states.FAILURE # try: yield self.models.task.update(task, time_ended=time.time(), status=status, result=result) finally: self.concurrent_tasks.discard(task_id) self.finish_task(task_id, lock_id) # logger.info('finished %s', task_info) # publish into the task_done channel pubsub.publish(self.channel('task_done'), task_id) coroutine_return(task_id) def _setup_schedule(self): entries = {} if not self.schedule_periodic: return entries for name, t in self.registry.filter_types('periodic'): every = t.run_every if isinstance(every, int): every = timedelta(seconds=every) if not isinstance(every, timedelta): raise ValueError('Schedule %s is not a timedelta' % every) entries[name] = SchedulerEntry(name, every, t.anchor) return entries def _task_done_callback(self, task_id, exc=None): # Got a task_id from the ``<name>_task_done`` channel. # Check if a ``callback`` is available in the :attr:`callbacks` # dictionary. If so fire the callback with the ``task`` instance # corresponding to the input ``task_id``. # If a callback is not available, it must have been fired already done = self.callbacks.pop(task_id, None) if done: done.set_result(task_id) def __call__(self, channel, message): # PubSub callback name = self.event_name(channel) self.fire_event(name, message)
[docs]class SchedulerEntry(object): '''A class used as a schedule entry by the :class:`.TaskBackend`. .. attribute:: name Task name .. attribute:: run_every Interval in seconds .. attribute:: anchor Datetime anchor .. attribute:: last_run_at last run datetime .. attribute:: total_run_count Total number of times this periodic task has been executed by the :class:`.TaskBackend`. ''' def __init__(self, name, run_every, anchor=None): self.name = name self.run_every = run_every self.anchor = anchor self.last_run_at = datetime.now() self.total_run_count = 0 def __repr__(self): return self.name __str__ = __repr__ @property def scheduled_last_run_at(self): '''The scheduled last run datetime. This is different from :attr:`last_run_at` only when :attr:`anchor` is set. ''' last_run_at = self.last_run_at anchor = self.anchor if last_run_at and anchor: run_every = self.run_every times = int(timedelta_seconds(last_run_at - anchor) / timedelta_seconds(run_every)) if times: anchor += times*run_every while anchor <= last_run_at: anchor += run_every while anchor > last_run_at: anchor -= run_every self.anchor = anchor return anchor else: return last_run_at
[docs] def next(self, now=None): '''Increase the :attr:`total_run_count` attribute by one and set the value of :attr:`last_run_at` to ``now``. ''' self.last_run_at = now or datetime.now() self.total_run_count += 1
[docs] def is_due(self, now=None): '''Returns tuple of two items ``(is_due, next_time_to_run)``, where next time to run is in seconds. See :meth:`unuk.contrib.tasks.models.PeriodicTask.is_due` for more information. ''' last_run_at = self.scheduled_last_run_at now = now or datetime.now() rem_delta = last_run_at + self.run_every - now rem = timedelta_seconds(rem_delta) if rem == 0: return True, timedelta_seconds(self.run_every) return False, rem
class PulsarTaskBackend(TaskBackend): @lazyproperty def store_client(self): return self.store.client() @task def maybe_queue_task(self, task): free = True store = self.store c = self.channel if task['lock_id']: free = yield store.execute('hsetnx', c('locks'), task['lock_id'], task['id']) if free: with self.models.begin() as t: t.add(task) t.execute('lpush', c('inqueue'), task.id) yield t.wait() coroutine_return(task) else: coroutine_return() @task def get_task(self, task_id=None): store = self.store if not task_id: inq = self.channel('inqueue') ouq = self.channel('outqueue') task_id = yield store.execute('brpoplpush', inq, ouq, self.poll_timeout) if not task_id: coroutine_return() task = yield self.models.task.get(task_id) coroutine_return(task or None) def finish_task(self, task_id, lock_id): store = self.store pipe = store.pipeline() if lock_id: pipe.hdel(self.channel('locks'), lock_id) # Remove the task_id from the inqueue list pipe.lrem(self.channel('inqueue'), 0, task_id) return pipe.commit() def get_tasks(self, ids): base = self.models.task._meta.table_name store = self.models.task._read_store pipeline = store.pipeline() for pk in ids: pipeline.hgetall('%s:%s' % (base, pk), factory=partial(store.build_model, Task)) return pipeline.commit() def flush(self): return self.store.flush() task_backends['pulsar'] = PulsarTaskBackend task_backends['redis'] = PulsarTaskBackend