Documentation for pulsar 0.9.2. For development docs, go here.
For an overview of pulsar Actor
check out the design documentation.
pulsar.async.actor.
spawn
(**kwargs)[source]¶Spawn a new Actor
and return an ActorProxyFuture
.
Parameter kwargs
These optional parameters are:
aid
the actor idname
the actor namestart
, stopping
and periodic_task
Returns: | an ActorProxyFuture . |
---|
A typical usage:
>>> def do_something(actor):
...
>>> a = spawn(start=do_something, ...)
>>> a.aid
'ba42b02b'
>>> a.called
True
>>> p = a.result()
>>> p.address
('127.0.0.1', 46691)
pulsar.async.actor.
send
(target, action, *args, **params)[source]¶Send a message to target
The message is to perform a given action
. The actor sending the
message is obtained via the get_actor()
function.
Parameters: |
|
---|---|
Returns: | an |
Typical example:
>>> r = send(p,'ping')
>>> r.result()
'pong'
At the core of the library we have the Actor
class which defines
the primitive of pulsar concurrent framework. Actor’s instances communicate
with each other via messages in a share-nothing architecture.
pulsar.async.actor.
Actor
(impl)[source]¶The base class for parallel execution in pulsar.
In computer science, the Actor model is a mathematical model of concurrent computation that treats actors as the universal primitives of computation. In response to a message that it receives, an actor can make local decisions, create more actors, send more messages, and determine how to respond to the next message received.
The current implementation allows for actors to perform specific tasks such as listening to a socket, acting as http server, consuming a task queue and so forth.
To spawn a new actor:
>>> from pulsar import spawn
>>> a = spawn()
>>> a.is_alive()
True
Here a
is actually a reference to the remote actor, it is
an ActorProxy
.
ATTRIBUTES
impl
¶The Concurrency
implementation for this Actor
.
_loop
¶An event loop which listen
for input/output events on sockets or socket-like objects.
It is the driver of the Actor
.
If the _loop
stops, the Actor
stops
running and goes out of scope.
mailbox
¶Used to send and receive actor messages.
address
¶The socket address for this Actor.mailbox
.
proxy
¶Instance of a ActorProxy
holding a reference
to this Actor
. The proxy is a lightweight representation
of the actor which can be shared across different processes
(i.e. it is picklable).
state
¶The actor numeric state.
extra
¶A dictionary which can be populated with extra parameters useful
for other actors. This dictionary is included in the dictionary
returned by the info()
method.
Check the info command for how to obtain
information about an actor.
info_state
¶Current state description string. One of initial
, running
,
stopping
, closed
and terminated
.
next_periodic_task
¶The asyncio.Handle
for the next
actor periodic task.
monitors
¶Dictionary of monitors or None
managed_actors
¶Dictionary of managed actors or None
terminated_actors
¶Dictionary of terminated actors or None
registered
¶Dictionary of registered actors or None
start
()[source]¶Called after forking to start the actor’s life.
This is where logging is configured, the mailbox
is
registered and the _loop
is initialised and
started. Calling this method more than once does nothing.
send
(*args, **kwargs)[source]¶Send a message to target
to perform action
with given
positional args
and key-valued kwargs
.
Always return a Future
.
stop
(exc=None, exit_code=None)[source]¶Gracefully stop the Actor
.
Implemented by the Concurrency.stop()
method of the impl
attribute.
actorparams
()[source]¶Returns a dictionary of parameters for spawning actors.
The disctionary is passed to the spawn method when creating new actors. Fire the on_params actor hook.
is_running
()[source]¶True
if actor is running, that is when the state
is equal to ACTOR_STATES.RUN and the loop is
running.
started
()[source]¶True
if actor has started.
It does not necessarily mean it is running. Its state is greater or equal ACTOR_STATES.RUN.
closed
()[source]¶True
if actor has exited in an clean fashion.
Its state
is ACTOR_STATES.CLOSE.
stopped
()[source]¶True
if actor has exited.
Its state
is greater or equal to
ACTOR_STATES.CLOSE.
info
()[source]¶Return a nested dictionary of information related to the actor status and performance. The dictionary contains the following entries:
actor
a dictionary containing information regarding the type of
actor and its status.events
a dictionary of information about the
event loop running the actor.extra
the extra
attribute (you can use it to add stuff).system
system info.This method is invoked when you run the info command from another actor.
pulsar.async.proxy.
ActorProxy
(impl)[source]¶A proxy for a remote Actor
.
This is a lightweight class which delegates function calls to the underlying remote object.
It is picklable and therefore can be send from actor to actor using actor message passing.
For example, lets say we have a proxy a
, to send a message to it:
from pulsar import send
send(a, 'echo', 'hello there!')
will send the command echo
to actor a
with
parameter "hello there!"
.
address
¶the socket address of the underlying Actor.mailbox
.
pulsar.async.proxy.
ActorProxyFuture
(loop=None)[source]¶A Future
for an ActorProxy
.
The callback will be an ActorProxy
which will be received once
the remote Actor
is fully functional.
aid
¶The the remote Actor
id
pulsar.async.proxy.
ActorProxyMonitor
(impl)[source]¶A specialised ActorProxy
class.
It contains additional information about the remote underlying
Actor
. Instances of this class serialise into
ActorProxy
.
The ActorProxyMonitor
is special since it lives in the
Arbiter
domain and it is used by the Arbiter
(or a Monitor
) to monitor the state of the spawned actor.
impl
¶The Concurrency
instance for the remote Actor
. This
dictionary is constantly updated by the remote actor by sending the
info message.
mailbox
¶This is the connection with the remote actor. It is available once the
actor handshake between the actor and the monitor
has completed. The mailbox
is a server-side
MailboxProtocol
instance and it is used
by the send()
function to send messages to the remote actor.
notified
¶Last time this ActorProxyMonitor
was notified by the
remote actor.
proxy
¶The ActorProxy
for this monitor.
Actor
communicate with each other via the send()
function
which uses the via Actor.mailbox
attribute of the actor in the
current context. When an actor communicate with
another remote actor it does so by sending an action to it
with positional and/or key-valued arguments. For example:
send(target, 'ping')
will send the ping action to target from the actor in the current context of execution. The above is equivalent to:
get_actor().send(target, 'ping')
Each action is implemented via the command()
decorator implemented
in the pulsar.async.commands
module.
A list of standard commands
is available in the design documentation.
The Concurrency
class implements the behaviour of an Actor
and therefore allows for decoupling between the Actor
abstraction
and its implementation (bridge pattern).
pulsar.async.concurrency.
Concurrency
[source]¶Actor Concurrency
.
Responsible for the actual spawning of actors according to a
concurrency implementation. Instances are picklable
and are shared between the Actor
and its
ActorProxyMonitor
.
This is an abstract class, derived classes must implement the
start
method.
Parameters: |
|
---|
actor_class
¶alias of Actor
selector
()[source]¶Return a selector instance.
By default it return nothing so that the best handler for the system is chosen.
hand_shake
(actor)[source]¶Perform the hand shake for actor
The hand shake occurs when the actor
is in starting state.
It performs the following actions:
actor
as the actor of the current threadstart
eventstart
eventIf the hand shake is successful, the actor will eventually results in a running state.
periodic_task
(actor, **kw)[source]¶Implement the actor period task.
This is an internal method called periodically by the
Actor._loop
to ping the actor monitor.
If successful return a Future
called
back with the acknowledgement from the monitor.
pulsar.async.concurrency.
MonitorConcurrency
[source]¶Concurrency
class for a Monitor
.
Monitors live in the main thread of the master process and therefore do not require to be spawned.
periodic_task
(monitor, **kw)[source]¶Override the Concurrency.periodic_task()
to implement
the Monitor
periodic task.
pulsar.async.concurrency.
ArbiterConcurrency
[source]¶Concurrency implementation for the arbiter
add_monitor
(actor, monitor_name, **params)[source]¶Add a new monitor
.
Parameters: |
|
---|---|
Returns: | the |
create_mailbox
(actor, loop)[source]¶Override Concurrency.create_mailbox()
to create the
mailbox server.
periodic_task
(actor, **kw)[source]¶Override the Concurrency.periodic_task()
to implement
the Arbiter
periodic task.
Constants used throughout pulsar.
pulsar.async.consts.
ACTOR_STATES
= {'RUN': 3, 'DESCRIPTION': {0: 'initial', 1: 'inactive', 2: 'starting', 3: 'running', 4: 'stopping', 5: 'closed', 6: 'terminated'}, 'INITIAL': 0, 'TERMINATE': 6, 'INACTIVE': 1, 'STOPPING': 4, 'CLOSE': 5, 'STARTING': 2}¶Actor state constants are access via:
from pulsar import ACTOR_STATES
They are:
ACTOR_STATES.INITIAL = 0
when an actor is just created, before the
pulsar.Actor.start
method is called.ACTOR_STATES.STARTING = 2
when pulsar.Actor.start
method
is called.ACTOR_STATES.RUN = 3
when pulsar.Actor._loop
is up
and running.ACTOR_STATES.STOPPING = 4
when pulsar.Actor.stop
has been
called for the first time and the actor is running.pulsar.async.consts.
ACTOR_ACTION_TIMEOUT
= 5¶Important constant used by pulsar.Monitor
to kill actors which
don’t respond to the stop
command.
pulsar.async.consts.
MONITOR_TASK_PERIOD
= 1¶Interval for pulsar.Monitor
and pulsar.Arbiter
periodic task.