Documentation for pulsar 0.9.2. For development docs, go here.
Pulsar ships with an asynchronous TaskQueue
for distributing
work across threads/processes or machines. It is highly customisable,
it can run in multi-threading or multi-processing (default) mode and
can share Task
across several machines.
By creating Job
classes in a similar way you do for celery,
this application gives you all you need for running them with very
little setup effort:
from pulsar.apps import tasks
if __name__ == '__main__':
tasks.TaskQueue(tasks_path=['path.to.tasks.*']).start()
Check the task queue tutorial for a running example with simple tasks.
To get started, follow these guidelines:
manage.py
.A TaskQueue
accepts several configuration parameters on top of the
standard application settings:
The task_paths parameter specifies
a list of python paths where to collect Job
classes:
task_paths = ['myjobs','another.moduledir.*']
The *
at the end of the second module indicates to collect
Job
from all submodules of another.moduledir
.
The schedule_periodic flag indicates
if the TaskQueue
can schedule PeriodicJob
. Usually,
only one running TaskQueue
application is responsible for
scheduling tasks.
It can be specified in the command line via the
--schedule-periodic
flag.
Default: False
.
The task_backend parameter is a url type string which specifies the task backend to use.
It can be specified in the command line via the
--task-backend ...
option.
Default: local://
.
The concurrent_tasks parameter controls the maximum number of concurrent tasks for a given task worker. This parameter is important when tasks are asynchronous, that is when they perform some sort of I/O and the job callable returns and asynchronous component.
It can be specified in the command line via the
--concurrent-tasks ...
option.
Default: 5
.
A task queue application implements several
Job
classes which specify the way a Task
is run.
Each Job
class is a Task
factory, therefore,
a Task
is always associated
with one Job
, which can be of two types:
Job
)PeriodicJob
), a generator of scheduled tasks.To define a job is simple, subclass from Job
and implement the
job callable method:
from pulsar.apps import tasks
class Addition(tasks.Job):
def __call__(self, consumer, a=0, b=0):
"Add two numbers"
return a+b
The consumer
, instance of TaskConsumer
,
is passed by the Task backend and should
always be the first positional parameter in the callable method.
The remaining (optional key-valued only!) parameters are needed by
your job implementation.
A job callable can also return a coroutine if it needs to perform asynchronous IO during its execution:
class Crawler(tasks.Job):
def __call__(self, consumer, sample=100, size=10):
response = yield http.request(...)
content = response.content
...
This allows for cooperative task execution.
The can_overlap
attribute controls the way tasks are generated
by a specific Job
. By default, a Job
creates a new task
every time the TaskBackend
requests it.
However, when setting the can_overlap
attribute to False
,
a new task cannot be started unless a previous task of the same job
is done.
The TaskBackend
is at the heart of the
task queue application. It exposes
all the functionalities for running new tasks, scheduling periodic tasks
and retrieving task information. Pulsar ships with two backends, one which uses
pulsar internals and store tasks in the arbiter domain and another which stores
tasks in redis.
The backend is created by the TaskQueue
as soon as it starts. It is then passed to all task queue workers
which, in turns, invoke the TaskBackend.start
method
to start pulling tasks form the distributed task queue.
A Task
can have one of the following status
string:
QUEUED = 6
A task queued but not yet executed.STARTED = 5
task where execution has started.RETRY = 4
A task is retrying calculation.REVOKED = 3
the task execution has been revoked (or timed-out).FAILURE = 2
task execution has finished with failure.SUCCESS = 1
task execution has finished with success.FULL_RUN_STATES
The set of states for which a Task
has run:
FAILURE
and SUCCESS
READY_STATES
The set of states for which a Task
has finished:
REVOKED
, FAILURE
and SUCCESS
A TaskBackend
broadcast Task
state into three different
channels via the a pubsub()
handler.
When creating a new TaskBackend
there are three methods which must
be implemented:
get_task()
method, invoked when retrieving
a Task
from the backend server.maybe_queue_task()
method, invoked when a new
class:.Task is created and ready to be queued.finish_task()
method, invoked when a
Task
reaches a ready state.For example:
from pulsar.apps import tasks
class TaskBackend(tasks.TaskBackend):
...
Once the custom task backend is implemented it must be registered:
tasks.task_backends['mybackend'] = TaskBackend
And the backend will be selected via:
--task-backend mybackend://host:port
List of all classes used by this application.
pulsar.apps.tasks.
TaskQueue
(callable=None, load_config=True, **params)[source]¶A pulsar Application
for consuming Task
.
This application can also schedule periodic tasks when the
schedule_periodic flag is True
.
backend
= None¶The TaskBackend
for this task queue.
Available once the TaskQueue
has started.
monitor_start
(*args, **kwargs)[source]¶Starts running the task queue in monitor
.
It calls the Application.callable
(if available)
and create the backend
.
monitor_task
(monitor)[source]¶Override the monitor_task()
callback.
Check if the backend
needs to schedule new tasks.
pulsar.apps.tasks.models.
Job
[source]¶The Job class which is used in a distributed task queue.
name
¶The unique name which defines the Job and which can be used to retrieve
it from the job registry. This attribute is set to the Job class name
in lower case by default, unless a name
class attribute is defined.
abstract
¶If set to True
(default is False
), the Job
won’t be
registered with the JobRegistry
. Useful when creating a new
base class for several other jobs.
type
¶Type of Job, one of regular
and periodic
.
timeout
¶An instance of a datetime.timedelta or None
. If set, it represents the
time lag after which a task which did not start expires.
Default: None
.
can_overlap
¶Boolean indicating if this job can generate overlapping tasks. It can also be a callable which accept the same input parameters as the job callable function.
Default: True
.
doc_syntax
¶The doc string syntax.
Default: markdown
logger
¶an instance of a logger. Created at runtime.
type
¶Type of Job, one of regular
and periodic
.
queue_task
(consumer, jobname, meta_params=None, **kwargs)[source]¶Queue a new task in the task queue.
This utility method can be used from within the job callable method and it allows tasks to act as tasks factories.
Parameters: |
|
---|---|
Returns: | a |
This method invokes the TaskBackend.queue_task()
method with the additional from_task
argument equal to the
id of the task invoking the method.
pulsar.apps.tasks.models.
PeriodicJob
(run_every=None)[source]¶A periodic Job
implementation.
anchor
= None¶If specified it must be a datetime
instance.
It controls when the periodic Job is run.
is_due
(last_run_at)[source]¶Returns tuple of two items (is_due, next_time_to_run)
,
where next time to run is in seconds. For example:
(True, 20)
, means the job should be run now, and the next
time to run is in 20 seconds.(False, 12)
, means the job should be run in 12 seconds.You can override this to decide the interval at runtime.
pulsar.apps.tasks.backend.
Task
(*args, **kwargs)[source]¶A data Model
containing task execution data.
id
= None¶Task unique identifier.
time_ended
= None¶The timestamp indicating when this has finished.
expiry
= None¶The timestamp indicating when this task expires.
If the task is not started before this value it is REVOKED
.
status
= None¶flag indicating the task status
kwargs
= None¶Key-valued parameters used by this task
result
= None¶Result as a json object
done
()[source]¶Return True
if the Task
has finshed.
Its status is one of READY_STATES.
pulsar.apps.tasks.backend.
TaskBackend
(store, logger=None, task_paths=None, schedule_periodic=False, backlog=1, max_tasks=0, name=None, poll_timeout=None)[source]¶A backend class for running Task
.
A TaskBackend
is responsible for creating tasks and put them
into the distributed queue.
It also schedules the run of periodic tasks if enabled to do so.
task_paths
¶List of paths where to upload jobs which are factory of tasks. Passed by the task-queue application task paths setting.
schedule_periodic
¶True
if this TaskBackend
can schedule periodic tasks.
Passed by the task-queue application schedule-periodic setting.
backlog
¶The maximum number of concurrent tasks running on a task-queue
for an Actor
. A number in the order of 5 to 10 is normally
used. Passed by the task-queue application
concurrent tasks setting.
max_tasks
¶The maximum number of tasks a worker will process before restarting. Passed by the task-queue application max requests setting.
poll_timeout
¶The (asynchronous) timeout for polling tasks from the task queue.
It is always a positive number and it can be specified via the backend connection string:
local://?poll_timeout=3
There shouldn’t be any reason to modify the default value.
Default: 2
.
processed
¶The number of tasks processed (so far) by the worker running this
backend.
This value is important in connection with the max_tasks
attribute.
num_concurrent_tasks
¶The number of concurrent_tasks
.
This number is never greater than the backlog
attribute.
registry
¶The JobRegistry
for this backend.
channel
(name)[source]¶Given an event name
returns the corresponding channel name.
The event name
is one of task_queued
, task_started
or task_done
queue_task
(*args, **kwargs)[source]¶Try to queue a new Task.
This method returns a Future
which results in the
task id
created. If jobname
is not a valid
Job.name
, a TaskNotAvailable
exception occurs.
Parameters: |
|
---|---|
Returns: | a |
wait_for_task
(task_id, timeout=None)[source]¶Asynchronously wait for a task with task_id
to have finished
its execution.
get_task
(task_id=None)[source]¶Asynchronously retrieve a Task
from a task_id
.
Parameters: | task_id – the id of the task to retrieve. |
---|---|
Returns: | a Task or None . |
finish_task
(task_id, lock_id)[source]¶Invoked at the end of task execution.
The Task
with task_id
has been executed (either
successfully or not) or has been revoked. This method perform
backend specific operations.
Must be implemented by subclasses.
close
()[source]¶Close this TaskBackend
.
Invoked by the Actor
when stopping.
generate_task_ids
(job, kwargs)[source]¶An internal method to generate task unique identifiers.
Parameters: |
|
---|---|
Returns: | a two-elements tuple containing the unique id and an
identifier for overlapping tasks if the |
Called by the TaskBackend when creating a new task.
pulsar.apps.tasks.backend.
TaskConsumer
(backend, worker, task_id, job)[source]¶A context manager for consuming tasks.
Instances of this consumer are created by the TaskBackend
when
a task is executed.
backend
¶The TaskBackend
. This is useful when creating
tasks from within a job callable.
pulsar.apps.tasks.backend.
SchedulerEntry
(name, run_every, anchor=None)[source]¶A class used as a schedule entry by the TaskBackend
.
name
¶Task name
run_every
¶Interval in seconds
anchor
¶Datetime anchor
last_run_at
¶last run datetime
total_run_count
¶Total number of times this periodic task has been executed by the
TaskBackend
.
scheduled_last_run_at
¶The scheduled last run datetime.
This is different from last_run_at
only when
anchor
is set.
next
(now=None)[source]¶Increase the total_run_count
attribute by one and set the
value of last_run_at
to now
.
The TaskQueue
application does not expose
an external API to run new tasks or retrieve task information.
The TaskQueueRpcMixin
class can be used to achieve just that.
It is a JSONRPC
handler which exposes six functions
for executing tasks and retrieving task information.
The task-queue example shows how to use this class in the context of a WSGI server running along side the task-queue application.
pulsar.apps.tasks.rpc.
TaskQueueRpcMixin
(taskqueue, **kwargs)[source]¶A JSONRPC
mixin for communicating with a TaskQueue
.
To use it, you need to have an RPC application
and a task queue application installed in the
Arbiter
.
Parameters: | taskqueue – instance or name of the TaskQueue
application which exposes the remote procedure calls. |
---|
rpc_job_list
(*args, **kwargs)[source]¶Return the list of Jobs registered with task queue with meta information.
If a list of jobnames
is given, it returns only jobs
included in the list.
rpc_queue_task
(*args, **kwargs)[source]¶Queue a new jobname
in the task queue.
The task can be of any type as long as it is registered in the
task queue registry. To check the available tasks call the
rpc_job_list()
function.
It returns the task id
.
rpc_wait_for_task
(*args, **kwargs)[source]¶Wait for a task to have finished.
Parameters: |
|
---|---|
Returns: | the json representation of the task once it has finished. |
task_request_parameters
(request)[source]¶Internal function which returns a dictionary of parameters
to be passed to the Task
class constructor.
This function can be overridden to add information about the type of request, who made the request and so forth. It must return a dictionary. By default it returns an empty dictionary.