Documentation for pulsar 0.9.2. For development docs, go here.

Source code for pulsar.apps.tasks

'''To get started, follow these guidelines:

* Create a script which runs your application, in the
  :ref:`taskqueue tutorial <tutorials-taskqueue>` the script is called
  ``manage.py``.
* Create the modules where :ref:`jobs <app-taskqueue-job>` are implemented. It
  can be a directory containing several submodules as explained in the
  :ref:`task paths parameter <app-tasks_path>`.
* Run your script, sit back and relax.

.. _app-taskqueue-config:

Configuration
~~~~~~~~~~~~~~~~
A :class:`TaskQueue` accepts several configuration parameters on top of the
standard :ref:`application settings <settings>`:

.. _app-tasks_path:

* The :ref:`task_paths <setting-task_paths>` parameter specifies
  a list of python paths where to collect :class:`.Job` classes::

      task_paths = ['myjobs','another.moduledir.*']

  The ``*`` at the end of the second module indicates to collect
  :class:`.Job` from all submodules of ``another.moduledir``.

* The :ref:`schedule_periodic <setting-schedule_periodic>` flag indicates
  if the :class:`TaskQueue` can schedule :class:`.PeriodicJob`. Usually,
  only one running :class:`TaskQueue` application is responsible for
  scheduling tasks.

  It can be specified in the command line via the
  ``--schedule-periodic`` flag.

  Default: ``False``.

* The :ref:`task_backend <setting-task_backend>` parameter is a url
  type string which specifies the :ref:`task backend <apps-taskqueue-backend>`
  to use.

  It can be specified in the command line via the
  ``--task-backend ...`` option.

  Default: ``local://``.

* The :ref:`concurrent_tasks <setting-concurrent_tasks>` parameter controls
  the maximum number of concurrent tasks for a given task worker.
  This parameter is important when tasks are asynchronous, that is when
  they perform some sort of I/O and the :ref:`job callable <job-callable>`
  returns and :ref:`asynchronous component <tutorials-coroutine>`.

  It can be specified in the command line via the
  ``--concurrent-tasks ...`` option.

  Default: ``5``.

.. _celery: http://celeryproject.org/
'''
import time

import pulsar
from pulsar import command, task
from pulsar.utils.config import section_docs
from pulsar.apps.data import create_store
from pulsar.apps.data.stores import start_store
from pulsar.apps.ds import DEFAULT_PULSAR_STORE_ADDRESS

from .models import *
from .backend import *
from .rpc import *
from .states import *


DEFAULT_TASK_BACKEND = 'pulsar://%s/1' % DEFAULT_PULSAR_STORE_ADDRESS

section_docs['Task Consumer'] = '''
This section covers configuration parameters used by CPU bound type
applications such as the :ref:`distributed task queue <apps-taskqueue>` and
the :ref:`test suite <apps-test>`.'''


class TaskSetting(pulsar.Setting):
    virtual = True
    app = 'tasks'
    section = "Task Consumer"


class ConcurrentTasks(TaskSetting):
    name = "concurrent_tasks"
    flags = ["--concurrent-tasks"]
    validator = pulsar.validate_pos_int
    type = int
    default = 5
    desc = """\
        The maximum number of concurrent tasks for a worker.

        When a task worker reach this number it stops polling for more tasks
        until one or more task finish. It should only affect task queues under
        significant load.
        Must be a positive integer. Generally set in the range of 5-10.
        """


class TaskBackendConnection(TaskSetting):
    name = "task_backend"
    flags = ["--task-backend"]
    default = ""
    meta = 'CONNECTION_STRING'
    desc = '''\
        Connection string for the backend storing tasks.

        If the value is not available (default) it uses as fallback the
        :ref:`data_store <setting-data_store>` value. If still not
        set, it uses the ``%s`` value.
        ''' % DEFAULT_TASK_BACKEND


class TaskPaths(TaskSetting):
    name = "task_paths"
    validator = pulsar.validate_list
    default = []
    desc = """\
        List of python dotted paths where tasks are located.

        This parameter can only be specified during initialization or in a
        :ref:`config file <setting-config>`.
        """


class SchedulePeriodic(TaskSetting):
    name = 'schedule_periodic'
    flags = ["--schedule-periodic"]
    validator = pulsar.validate_bool
    action = "store_true"
    default = False
    desc = '''\
        Enable scheduling of periodic tasks.

        If enabled, :class:`.PeriodicJob` will produce
        tasks according to their schedule.
        '''


[docs]class TaskQueue(pulsar.Application): '''A pulsar :class:`.Application` for consuming :class:`.Task`. This application can also schedule periodic tasks when the :ref:`schedule_periodic <setting-schedule_periodic>` flag is ``True``. ''' backend = None '''The :class:`.TaskBackend` for this task queue. Available once the :class:`.TaskQueue` has started. ''' name = 'tasks' cfg = pulsar.Config(apps=('tasks',), timeout=600) @task
[docs] def monitor_start(self, monitor): '''Starts running the task queue in ``monitor``. It calls the :attr:`.Application.callable` (if available) and create the :attr:`~.TaskQueue.backend`. ''' if self.cfg.callable: self.cfg.callable() connection_string = (self.cfg.task_backend or self.cfg.data_store or DEFAULT_TASK_BACKEND) store = yield start_store(connection_string, loop=monitor._loop) self.get_backend(store)
[docs] def monitor_task(self, monitor): '''Override the :meth:`~.Application.monitor_task` callback. Check if the :attr:`~.TaskQueue.backend` needs to schedule new tasks. ''' if self.backend and monitor.is_running(): if self.backend.next_run <= time.time(): self.backend.tick()
def monitor_stopping(self, monitor, exc=None): if self.backend: self.backend.close() def worker_start(self, worker, exc=None): if not exc: self.get_backend().start(worker) def worker_stopping(self, worker, exc=None): if self.backend: return self.backend.close() def actorparams(self, monitor, params): # makes sure workers are only consuming tasks, not scheduling. cfg = params['cfg'] cfg.set('schedule_periodic', False) def worker_info(self, worker, info=None): be = self.backend if be: tasks = {'concurrent': list(be.concurrent_tasks), 'processed': be.processed} info['tasks'] = tasks def get_backend(self, store=None): if self.backend is None: if store is None: store = create_store(self.cfg.task_backend) else: self.cfg.set('task_backend', store.dns) task_backend = task_backends.get(store.name) if not task_backend: raise pulsar.ImproperlyConfigured( 'Task backend for %s not available' % store.name) self.backend = task_backend( store, logger=self.logger, name=self.name, task_paths=self.cfg.task_paths, schedule_periodic=self.cfg.schedule_periodic, max_tasks=self.cfg.max_requests, backlog=self.cfg.concurrent_tasks) self.logger.debug('created %s', self.backend) return self.backend
@command() def next_scheduled(request, jobnames=None): actor = request.actor return actor.app.backend.next_scheduled(jobnames)