This document describes cell 0.0. For development docs, go here.

cell.actors

cell.actors

class cell.actors.Actor(connection=None, id=None, name=None, exchange=None, logger=None, agent=None, outbox_exchange=None, group_exchange=None, **kwargs)
class AsyncResult(ticket, actor)
Error

alias of CellError

exception NoReplyError

No reply received within time constraint

Actor.AsyncResult.gather(propagate=True, **kwargs)
Actor.AsyncResult.get(**kwargs)

What kind of arguments should be pass here

Actor.AsyncResult.result(**kwargs)
Actor.AsyncResult.to_python(reply, propagate=True)

Extracts the value out of the reply message.

Parameters:reply

In the case of a successful call the reply message will be:

{'ok': return_value, **default_fields}

Therefore the method returns: return_value, **default_fields

If the method raises an exception the reply message will be:

{'nok': [repr exc, str traceback], **default_fields}
:keyword propagate - Propagate exceptions raised instead of returning
a result representation of the error.
Actor.Consumer(channel, **kwargs)

Returns a kombu.Consumer instance for this Actor

Actor.Error

alias of CellError

exception Actor.Next

Used in a gather scenario to signify that no reply should be sent, to give another agent the chance to reply.

exception Actor.NoReplyError

No reply received within time constraint

exception Actor.NoRouteError

Presence: No known route for wanted item.

exception Actor.NotBoundError

Object is not bound to a connection.

Actor.add_binding(source, routing_key='', inbox_type='direct')
Actor.bind(connection, agent=None)
Actor.call(method, args={}, retry=False, retry_policy=None, ticket=None, **props)

Send message to the same actor and return AsyncResult.

Actor.call_or_cast(method, args={}, nowait=False, **kwargs)

Apply remote method asynchronously or synchronously depending on the value of nowait.

Parameters:
  • method – The name of the remote method to perform.
  • args – Dictionary of arguments for the method.
  • nowait – If false the call will block until the result is available and return it (default), if true the call will be non-blocking and no result will be returned.
  • retry – If set to true then message sending will be retried in the event of connection failures. Default is decided by the retry attributed.
  • retry_policy – Override retry policies. See retry_policy. This must be a dictionary, and keys will be merged with the default retry policy.
  • timeout – Timeout to wait for replies in seconds as a float (only relevant in blocking mode).
  • limit – Limit number of replies to wait for (only relevant in blocking mode).
  • callback – If provided, this callback will be called for every reply received (only relevant in blocking mode).
  • **props – Additional message properties. See kombu.Producer.publish().
Actor.cast(method, args={}, declare=None, retry=None, retry_policy=None, type=None, exchange=None, **props)

Send message to actor. Discarding replies.

Actor.construct()

Actor specific initialization.

Actor.construct_state()

Instantiates the state class of this actor.

Actor.consumer = None
Actor.contribute_to_object(obj, map)
Actor.contribute_to_state(state)
Actor.default_fields = {}

Additional fields added to reply messages by default.

Actor.default_receive(msg_body)

Override in the derived classes.

Actor.default_routing_key = None

Default routing key used if no to argument passed.

Actor.default_timeout = 5.0

Default timeout in seconds as a float which after we give up waiting for replies.

Actor.delivery_mode = 'persistent'

Delivery mode: persistent or transient. Default is persistent.

Actor.emit(method, args={}, retry=None)
Actor.exchange = None

Default exchange(direct) used for messages to this actor.

Actor.get_binder(type)
Actor.get_direct_exchange()

Returns a :class:’kombu.Exchange’ with type direct

Actor.get_direct_queue()

Returns a :class: kombu.Queue instance to be used to listen for messages send to this specific Actor instance

Actor.get_queues()
Actor.get_reply_queue(ticket)
Actor.get_rr_exchange()

Returns a :class:’kombu.Exchange’ instance with type set to fanout. The exchange is used for sending in a round-robin style

Actor.get_rr_queue()

Returns a :class: kombu.Queue instance for receiving round-robin commands for this actor type.

Actor.get_scatter_exchange()

Returns a :class:’kombu.Exchange’ for type fanout

Actor.get_scatter_queue()

Returns a :class: kombu.Queue instance for receiving broadcast commands for this actor type.

Actor.get_unbinder(type)
Actor.handle_call(body, message)

Handle call message.

Actor.handle_cast(body, message)

Handle cast message.

Actor.idle = 40
Actor.inbox_direct
Actor.inbox_rr
Actor.inbox_scatter
Actor.is_bound()
Actor.lookup_action(name)
Actor.meta = {}
Actor.name = None

Actor name. Defaults to the defined class name.

Actor.no_ack = False

Set to True to disable acks.

Actor.on_agent_ready()
Actor.on_message(body, message)
Actor.outbox
Actor.outbox_exchange = None

Exchange used for forwarding/binding with other actors.

Actor.remove_binding(source, routing_key='', inbox_type='direct')
Actor.reply(req, body, **props)
Actor.reply_exchange = <unbound Exchange cl.reply(direct)>

Exchange used for replies.

Actor.reply_expires = 100.0

Time in seconds as a float which after replies expires.

Actor.retry = None

Should we retry publishing messages by default? Default: NO

Actor.retry_policy = {'interval_start': 0, 'interval_max': 1, 'max_retries': 100, 'interval_step': 0.2}

Default policy used when retrying publishing messages. see kombu.BrokerConnection.ensure() for a list of supported keys.

Actor.routing_key
Actor.scatter(method, args={}, nowait=False, timeout=None, **kwargs)

Broadcast method to all agents.

if nowait is False, returns generator to iterate over the results.

Parameters:
  • limit – Limit number of reads from the queue. Unlimited by default.
  • timeout
    the timeout (in float seconds) waiting for replies.
    Default is default_timeout.

    Examples

scatter is a generator (if nowait is False)::
>>> res = scatter()
>>> res.next() # one event consumed, or timed out.
>>> res = scatter(limit=2):
>>> for i in res:  # two events consumed or timeout
>>>     pass

See call_or_cast() for a full list of supported arguments.

Actor.send(method, args={}, to=None, nowait=False, **kwargs)

Call method on agent listening to routing_key.

See call_or_cast() for a full list of supported arguments.

If the keyword argument nowait is false (default) it will block and return the reply.

j

Actor.serializer = 'json'

Default serializer used to send messages and reply messages.

class Actor.state

Placeholder class for actor’s supported methods.

Actor.throw(method, args={}, nowait=False, **kwargs)

Call method on one of the agents in round robin.

See call_or_cast() for a full list of supported arguments.

If the keyword argument nowait is false (default) it will block and return the reply.

Actor.ticket_count = count(1)

returns the next anonymous ticket number used fo+r identifying related logs.

Actor.ttl = 20

time-to-live for the actor before becoming Idle

Actor.type_to_rkey = {'round-robin': '__rr__', 'scatter': '__scatter__', 'rr': '__rr__'}

Map of calling types and their special routing keys.

Actor.types = ('direct', 'scatter', 'round-robin')

List of calling types this actor should handle. Valid types are:

  • direct

    Send the message directly to an agent by exact routing key.

  • round-robin

    Send the message to an agent by round-robin.

  • scatter

    Send the message to all of the agents (broadcast).