Queue

The Queue class is used to work with RabbitMQ queues on an open channel. The following example shows how you can create a queue using the Queue.declare method.

import rabbitpy

with rabbitpy.Connection() as connection:
    with connection.channel() as channel:
        queue = rabbitpy.Queue(channel, 'my-queue')
        queue.durable = True
        queue.declare()

To consume messages you can iterate over the Queue object itself if the defaults for the Queue.__iter__() method work for your needs:

with conn.channel() as channel:
    for message in rabbitpy.Queue(channel, 'example'):
        print 'Message: %r' % message
        message.ack()

or by the Queue.consume_messages() method if you would like to specify no_ack, prefetch_count, or priority:

with conn.channel() as channel:
    queue = rabbitpy.Queue(channel, 'example')
    for message in queue.consume_messages():
        print 'Message: %r' % message
        message.ack()

API Documentation

class rabbitpy.Queue(channel, name='', durable=False, exclusive=False, auto_delete=False, max_length=None, message_ttl=None, expires=None, dead_letter_exchange=None, dead_letter_routing_key=None, arguments=None)[source]

Create and manage RabbitMQ queues.

Parameters:
  • channel (rabbitpy.channel.Channel) – The channel object to communicate on
  • name (str) – The name of the queue
  • exclusive (bool) – Queue can only be used by this channel and will auto-delete once the channel is closed.
  • durable (bool) – Indicates if the queue should survive a RabbitMQ is restart
  • auto_delete (bool) – Automatically delete when all consumers disconnect
  • max_length (int) – Maximum queue length
  • message_ttl (int) – Time-to-live of a message in milliseconds
  • expires (int) – Milliseconds until a queue is removed after becoming idle
  • dead_letter_exchange (str) – Dead letter exchange for rejected messages
  • dead_letter_routing_key (str) – Routing key for dead lettered messages
  • arguments (dict) – Custom arguments for the queue
__init__(channel, name='', durable=False, exclusive=False, auto_delete=False, max_length=None, message_ttl=None, expires=None, dead_letter_exchange=None, dead_letter_routing_key=None, arguments=None)[source]

Create a new Queue object instance. Only the rabbitpy.Channel object is required.

__iter__()[source]

Quick way to consume messages using defaults of no_ack=False, prefetch of 100, and no priority set.

Yields:rabbitpy.message.Message
__len__()[source]

Return the pending number of messages in the queue by doing a passive Queue declare.

Return type:int
__setattr__(name, value)[source]

Validate the data types for specific attributes when setting them, otherwise fall throw to the parent __setattr__

Parameters:
  • name (str) – The attribute to set
  • value (mixed) – The value to set
Raises:

ValueError

bind(source, routing_key=None, arguments=None)[source]

Bind the queue to the specified exchange or routing key.

Parameters:
  • source (str or rabbitpy.exchange.Exchange exchange) – The exchange to bind to
  • routing_key (str) – The routing key to use
  • arguments (dict) – Optional arguments for for RabbitMQ
Returns:

bool

consume_messages(no_ack=False, prefetch=None, priority=None)[source]

Consume messages from the queue as a generator:

```
for message in queue.consume_messages():
message.ack()

```

You can use this message instead of the queue object as an iterator if you need to alter the prefect count, set the consumer priority or consume in no_ack mode.

Parameters:
  • no_ack (bool) – Do not require acknowledgements
  • prefetch (int) – Set a prefetch count for the channel
  • priority (int) – Consumer priority
Return type:

Iterator

consumer(*args, **kwds)[source]

Consumer message context manager, returns a consumer message generator.

Parameters:
  • no_ack (bool) – Do not require acknowledgements
  • prefetch (int) – Set a prefetch count for the channel
  • priority (int) – Consumer priority
Return type:

Consumer

declare(passive=False)[source]

Declare the queue on the RabbitMQ channel passed into the constructor, returning the current message count for the queue and its consumer count as a tuple.

Parameters:passive (bool) – Passive declare to retrieve message count and consumer count information
Returns:Message count, Consumer count
Return type:tuple(int, int)
delete(if_unused=False, if_empty=False)[source]

Delete the queue

Parameters:
  • if_unused (bool) – Delete only if unused
  • if_empty (bool) – Delete only if empty
get(acknowledge=True)[source]

Request a single message from RabbitMQ using the Basic.Get AMQP command.

Parameters:acknowledge (bool) – Let RabbitMQ know if you will manually acknowledge or negatively acknowledge the message after each get.
Return type:rabbitpy.message.Message or None
ha_declare(nodes=None)[source]

Declare a the queue as highly available, passing in a list of nodes the queue should live on. If no nodes are passed, the queue will be declared across all nodes in the cluster.

Parameters:nodes (list) – A list of nodes to declare. If left empty, queue will be declared on all cluster nodes.
Returns:Message count, Consumer count
Return type:tuple(int, int)
purge()[source]

Purge the queue of all of its messages.

stop_consuming()[source]

Stop consuming messages. This is usually invoked if you want to cancel your consumer from outside the context manager or generator.

If you invoke this, there is a possibility that the generator method will return None instead of a rabbitpy.Message.

unbind(source, routing_key=None)[source]

Unbind queue from the specified exchange where it is bound the routing key. If routing key is None, use the queue name.

Parameters:
  • source (str or rabbitpy.exchange.Exchange exchange) – The exchange to unbind from
  • routing_key (str) – The routing key that binds them