Documentation for pulsar 0.9.2. For development docs, go here.
'''
A :ref:`task queue <apps-taskqueue>` application implements several
:class:`Job` classes which specify the way a :class:`.Task` is run.
Each :class:`Job` class is a :class:`.Task` factory, therefore,
a :class:`.Task` is always associated
with one :class:`Job`, which can be of two types:
* standard (:class:`.Job`)
* periodic (:class:`.PeriodicJob`), a generator of scheduled tasks.
.. _job-callable:
Job callable method
~~~~~~~~~~~~~~~~~~~~~~~~~~
To define a job is simple, subclass from :class:`.Job` and implement the
**job callable method**::
from pulsar.apps import tasks
class Addition(tasks.Job):
def __call__(self, consumer, a=0, b=0):
"Add two numbers"
return a+b
The ``consumer``, instance of :class:`.TaskConsumer`,
is passed by the :ref:`Task backend <apps-taskqueue-backend>` and should
always be the first positional parameter in the callable method.
The remaining (optional key-valued only!) parameters are needed by
your job implementation.
A :ref:`job callable <job-callable>` can also return a
:ref:`coroutine <coroutine>` if it needs to perform asynchronous IO during its
execution::
class Crawler(tasks.Job):
def __call__(self, consumer, sample=100, size=10):
response = yield http.request(...)
content = response.content
...
This allows for cooperative task execution.
.. _job-non-overlap:
Non overlapping Jobs
~~~~~~~~~~~~~~~~~~~~~~~~~~
The :attr:`~.Job.can_overlap` attribute controls the way tasks are generated
by a specific :class:`.Job`. By default, a :class:`.Job` creates a new task
every time the :class:`.TaskBackend` requests it.
However, when setting the :attr:`~.Job.can_overlap` attribute to ``False``,
a new task cannot be started unless a previous task of the same job
is done.
'''
from datetime import datetime, date
import logging
from pulsar.utils.pep import iteritems
from pulsar.utils.importer import import_modules
__all__ = ['JobMetaClass', 'Job', 'PeriodicJob',
'anchorDate', 'JobRegistry']
[docs]class JobRegistry(dict):
"""Site registry for tasks."""
[docs] def regular(self):
"""A generator of all regular jobs."""
return self.filter_types("regular")
[docs] def periodic(self):
"""A generator of all periodic jobs."""
return self.filter_types("periodic")
[docs] def register(self, job):
"""Register a job in the job registry.
The task will be automatically instantiated if not already an
instance.
"""
if isinstance(job, JobMetaClass) and job.can_register:
name = job.name
self[name] = job()
[docs] def filter_types(self, type):
"""Return a generator of all tasks of a specific type."""
return ((job_name, job) for job_name, job in iteritems(self)
if job.type == type)
@classmethod
def load(cls, paths):
self = cls()
for mod in import_modules(paths):
for name in dir(mod):
self.register(getattr(mod, name))
return self
class JobMetaClass(type):
def __new__(cls, name, bases, attrs):
attrs['can_register'] = not attrs.pop('abstract', False)
job_name = attrs.get("name", name).lower()
log_prefix = attrs.get("log_prefix") or "pulsar"
attrs["name"] = job_name
logname = '%s.job.%s' % (log_prefix, name)
attrs['logger'] = logging.getLogger(logname)
return super(JobMetaClass, cls).__new__(cls, name, bases, attrs)
[docs]class Job(JobMetaClass('JobBase', (object,), {'abstract': True})):
'''The Job class which is used in a distributed task queue.
.. attribute:: name
The unique name which defines the Job and which can be used to retrieve
it from the job registry. This attribute is set to the Job class name
in lower case by default, unless a ``name`` class attribute is defined.
.. attribute:: abstract
If set to ``True`` (default is ``False``), the :class:`.Job` won't be
registered with the :class:`.JobRegistry`. Useful when creating a new
base class for several other jobs.
.. attribute:: type
Type of Job, one of ``regular`` and ``periodic``.
.. attribute:: timeout
An instance of a datetime.timedelta or ``None``. If set, it represents the
time lag after which a task which did not start expires.
Default: ``None``.
.. attribute:: can_overlap
Boolean indicating if this job can generate overlapping tasks. It can
also be a callable which accept the same input parameters as the job
callable function.
Default: ``True``.
.. attribute:: doc_syntax
The doc string syntax.
Default: ``markdown``
.. attribute:: logger
an instance of a logger. Created at runtime.
'''
abstract = True
timeout = None
expires = None
doc_syntax = 'markdown'
can_overlap = True
def __call__(self, consumer, *args, **kwargs):
raise NotImplementedError("Jobs must implement the __call__ method.")
@property
def type(self):
'''Type of Job, one of ``regular`` and ``periodic``.'''
return 'regular'
[docs] def queue_task(self, consumer, jobname, meta_params=None, **kwargs):
'''Queue a new task in the task queue.
This utility method can be used from within the
:ref:`job callable <job-callable>` method and it allows tasks to act
as tasks factories.
:parameter consumer: the :class:`.TaskConsumer`
handling the :class:`.Task`.
Must be the same instance as the one passed to the
:ref:`job callable <job-callable>` method.
:parameter jobname: The name of the :class:`.Job` to run.
:parameter kwargs: key-valued parameters for the
:ref:`job callable <job-callable>`.
:return: a :class:`~asyncio.Future` called back with the task id.
This method invokes the :meth:`.TaskBackend.queue_task`
method with the additional ``from_task`` argument equal to the
id of the task invoking the method.
'''
if meta_params is None:
meta_params = {}
meta_params['from_task'] = consumer.task_id
return consumer.backend.queue_task(jobname, meta_params=meta_params,
**kwargs)
[docs]class PeriodicJob(Job):
'''A periodic :class:`.Job` implementation.'''
abstract = True
anchor = None
'''If specified it must be a :class:`~datetime.datetime` instance.
It controls when the periodic Job is run.
'''
run_every = None
'''Periodicity as a :class:`~datetime.timedelta` instance.'''
def __init__(self, run_every=None):
self.run_every = run_every or self.run_every
if self.run_every is None:
raise NotImplementedError('Periodic Jobs must have a run_every '
'attribute set, "{0}" does not have one'
.format(self.name))
@property
def type(self):
return 'periodic'
[docs] def is_due(self, last_run_at):
"""Returns tuple of two items ``(is_due, next_time_to_run)``,
where next time to run is in seconds. For example:
* ``(True, 20)``, means the job should be run now, and the next
time to run is in 20 seconds.
* ``(False, 12)``, means the job should be run in 12 seconds.
You can override this to decide the interval at runtime.
"""
return self.run_every.is_due(last_run_at)
def anchorDate(hour=0, minute=0, second=0):
'''Create an anchor date.'''
td = date.today()
return datetime(year=td.year, month=td.month, day=td.day,
hour=hour, minute=minute, second=second)