This document describes cell 0.0. For development docs, go here.
cell.actors¶
cell.actors
-
class
cell.actors.
Actor
(connection=None, id=None, name=None, exchange=None, logger=None, agent=None, outbox_exchange=None, group_exchange=None, **kwargs)¶ -
class
AsyncResult
(ticket, actor)¶ -
Error
¶ alias of
CellError
-
exception
NoReplyError
¶ No reply received within time constraint
-
Actor.AsyncResult.
gather
(propagate=True, **kwargs)¶
-
Actor.AsyncResult.
get
(**kwargs)¶ What kind of arguments should be pass here
-
Actor.AsyncResult.
result
(**kwargs)¶
-
Actor.AsyncResult.
to_python
(reply, propagate=True)¶ Extracts the value out of the reply message.
Parameters: reply – In the case of a successful call the reply message will be:
{'ok': return_value, **default_fields}
Therefore the method returns: return_value, **default_fields
If the method raises an exception the reply message will be:
{'nok': [repr exc, str traceback], **default_fields}
- :keyword propagate - Propagate exceptions raised instead of returning
- a result representation of the error.
-
-
Actor.
Consumer
(channel, **kwargs)¶ Returns a
kombu.Consumer
instance for this Actor
-
Actor.
Error
¶ alias of
CellError
-
exception
Actor.
Next
¶ Used in a gather scenario to signify that no reply should be sent, to give another agent the chance to reply.
-
exception
Actor.
NoReplyError
¶ No reply received within time constraint
-
exception
Actor.
NoRouteError
¶ Presence: No known route for wanted item.
-
exception
Actor.
NotBoundError
¶ Object is not bound to a connection.
-
Actor.
add_binding
(source, routing_key='', inbox_type='direct')¶
-
Actor.
bind
(connection, agent=None)¶
-
Actor.
call
(method, args={}, retry=False, retry_policy=None, ticket=None, **props)¶ Send message to the same actor and return
AsyncResult
.
-
Actor.
call_or_cast
(method, args={}, nowait=False, **kwargs)¶ Apply remote method asynchronously or synchronously depending on the value of nowait.
Parameters: - method – The name of the remote method to perform.
- args – Dictionary of arguments for the method.
- nowait – If false the call will block until the result is available and return it (default), if true the call will be non-blocking and no result will be returned.
- retry – If set to true then message sending will be retried
in the event of connection failures. Default is decided by the
retry
attributed. - retry_policy – Override retry policies.
See
retry_policy
. This must be a dictionary, and keys will be merged with the default retry policy. - timeout – Timeout to wait for replies in seconds as a float (only relevant in blocking mode).
- limit – Limit number of replies to wait for (only relevant in blocking mode).
- callback – If provided, this callback will be called for every reply received (only relevant in blocking mode).
- **props – Additional message properties.
See
kombu.Producer.publish()
.
-
Actor.
cast
(method, args={}, declare=None, retry=None, retry_policy=None, type=None, exchange=None, **props)¶ Send message to actor. Discarding replies.
-
Actor.
construct
()¶ Actor specific initialization.
-
Actor.
construct_state
()¶ Instantiates the state class of this actor.
-
Actor.
consumer
= None¶
-
Actor.
contribute_to_object
(obj, map)¶
-
Actor.
contribute_to_state
(state)¶
-
Actor.
default_fields
= {}¶ Additional fields added to reply messages by default.
-
Actor.
default_receive
(msg_body)¶ Override in the derived classes.
-
Actor.
default_routing_key
= None¶ Default routing key used if no
to
argument passed.
-
Actor.
default_timeout
= 5.0¶ Default timeout in seconds as a float which after we give up waiting for replies.
-
Actor.
delivery_mode
= 'persistent'¶ Delivery mode: persistent or transient. Default is persistent.
-
Actor.
emit
(method, args={}, retry=None)¶
-
Actor.
exchange
= None¶ Default exchange(direct) used for messages to this actor.
-
Actor.
get_binder
(type)¶
-
Actor.
get_direct_exchange
()¶ Returns a :class:’kombu.Exchange’ with type direct
-
Actor.
get_direct_queue
()¶ Returns a :class: kombu.Queue instance to be used to listen for messages send to this specific Actor instance
-
Actor.
get_queues
()¶
-
Actor.
get_reply_queue
(ticket)¶
-
Actor.
get_rr_exchange
()¶ Returns a :class:’kombu.Exchange’ instance with type set to fanout. The exchange is used for sending in a round-robin style
-
Actor.
get_rr_queue
()¶ Returns a :class: kombu.Queue instance for receiving round-robin commands for this actor type.
-
Actor.
get_scatter_exchange
()¶ Returns a :class:’kombu.Exchange’ for type fanout
-
Actor.
get_scatter_queue
()¶ Returns a :class: kombu.Queue instance for receiving broadcast commands for this actor type.
-
Actor.
get_unbinder
(type)¶
-
Actor.
handle_call
(body, message)¶ Handle call message.
-
Actor.
handle_cast
(body, message)¶ Handle cast message.
-
Actor.
idle
= 40¶
-
Actor.
inbox_direct
¶
-
Actor.
inbox_rr
¶
-
Actor.
inbox_scatter
¶
-
Actor.
is_bound
()¶
-
Actor.
lookup_action
(name)¶
-
Actor.
meta
= {}¶
-
Actor.
name
= None¶ Actor name. Defaults to the defined class name.
-
Actor.
no_ack
= False¶ Set to True to disable acks.
-
Actor.
on_agent_ready
()¶
-
Actor.
on_message
(body, message)¶
-
Actor.
outbox
¶
-
Actor.
outbox_exchange
= None¶ Exchange used for forwarding/binding with other actors.
-
Actor.
remove_binding
(source, routing_key='', inbox_type='direct')¶
-
Actor.
reply
(req, body, **props)¶
-
Actor.
reply_exchange
= <unbound Exchange cl.reply(direct)>¶ Exchange used for replies.
-
Actor.
reply_expires
= 100.0¶ Time in seconds as a float which after replies expires.
-
Actor.
retry
= None¶ Should we retry publishing messages by default? Default: NO
-
Actor.
retry_policy
= {'interval_start': 0, 'interval_max': 1, 'max_retries': 100, 'interval_step': 0.2}¶ Default policy used when retrying publishing messages. see
kombu.BrokerConnection.ensure()
for a list of supported keys.
-
Actor.
routing_key
¶
-
Actor.
scatter
(method, args={}, nowait=False, timeout=None, **kwargs)¶ Broadcast method to all agents.
if nowait is False, returns generator to iterate over the results.
Parameters: - limit – Limit number of reads from the queue. Unlimited by default.
- timeout –
- the timeout (in float seconds) waiting for replies.
- Default is
default_timeout
.
Examples
scatter
is a generator (if nowait is False)::>>> res = scatter() >>> res.next() # one event consumed, or timed out.
>>> res = scatter(limit=2): >>> for i in res: # two events consumed or timeout >>> pass
See
call_or_cast()
for a full list of supported arguments.
-
Actor.
send
(method, args={}, to=None, nowait=False, **kwargs)¶ Call method on agent listening to
routing_key
.See
call_or_cast()
for a full list of supported arguments.If the keyword argument nowait is false (default) it will block and return the reply.
j
-
Actor.
serializer
= 'json'¶ Default serializer used to send messages and reply messages.
-
class
Actor.
state
¶ Placeholder class for actor’s supported methods.
-
Actor.
throw
(method, args={}, nowait=False, **kwargs)¶ Call method on one of the agents in round robin.
See
call_or_cast()
for a full list of supported arguments.If the keyword argument nowait is false (default) it will block and return the reply.
-
Actor.
ticket_count
= count(1)¶ returns the next anonymous ticket number used fo+r identifying related logs.
-
Actor.
ttl
= 20¶ time-to-live for the actor before becoming Idle
-
Actor.
type_to_rkey
= {'round-robin': '__rr__', 'scatter': '__scatter__', 'rr': '__rr__'}¶ Map of calling types and their special routing keys.
-
Actor.
types
= ('direct', 'scatter', 'round-robin')¶ List of calling types this actor should handle. Valid types are:
- direct
Send the message directly to an agent by exact routing key.
- round-robin
Send the message to an agent by round-robin.
- scatter
Send the message to all of the agents (broadcast).
-
class