Queue¶
The Queue
class is used to work with RabbitMQ queues on an open channel. The following example shows how you can create a queue using the Queue.declare
method.
import rabbitpy
with rabbitpy.Connection() as connection:
with connection.channel() as channel:
queue = rabbitpy.Queue(channel, 'my-queue')
queue.durable = True
queue.declare()
To consume messages you can iterate over the Queue object itself if the defaults for the Queue.__iter__()
method work for your needs:
with conn.channel() as channel:
for message in rabbitpy.Queue(channel, 'example'):
print 'Message: %r' % message
message.ack()
or by the Queue.consume_messages()
method if you would like to specify no_ack, prefetch_count, or priority:
with conn.channel() as channel:
queue = rabbitpy.Queue(channel, 'example')
for message in queue.consume_messages():
print 'Message: %r' % message
message.ack()
API Documentation¶
-
class
rabbitpy.
Queue
(channel, name='', durable=False, exclusive=False, auto_delete=False, max_length=None, message_ttl=None, expires=None, dead_letter_exchange=None, dead_letter_routing_key=None, arguments=None)[source]¶ Create and manage RabbitMQ queues.
Parameters: - channel (
rabbitpy.channel.Channel
) – The channel object to communicate on - name (str) – The name of the queue
- exclusive (bool) – Queue can only be used by this channel and will auto-delete once the channel is closed.
- durable (bool) – Indicates if the queue should survive a RabbitMQ is restart
- auto_delete (bool) – Automatically delete when all consumers disconnect
- max_length (int) – Maximum queue length
- message_ttl (int) – Time-to-live of a message in milliseconds
- expires (int) – Milliseconds until a queue is removed after becoming idle
- dead_letter_exchange (str) – Dead letter exchange for rejected messages
- dead_letter_routing_key (str) – Routing key for dead lettered messages
- arguments (dict) – Custom arguments for the queue
-
__init__
(channel, name='', durable=False, exclusive=False, auto_delete=False, max_length=None, message_ttl=None, expires=None, dead_letter_exchange=None, dead_letter_routing_key=None, arguments=None)[source]¶ Create a new Queue object instance. Only the
rabbitpy.Channel
object is required.
-
__iter__
()[source]¶ Quick way to consume messages using defaults of no_ack=False, prefetch of 100, and no priority set.
Yields: rabbitpy.message.Message
-
__len__
()[source]¶ Return the pending number of messages in the queue by doing a passive Queue declare.
Return type: int
-
__setattr__
(name, value)[source]¶ Validate the data types for specific attributes when setting them, otherwise fall throw to the parent __setattr__
Parameters: - name (str) – The attribute to set
- value (mixed) – The value to set
Raises: ValueError
-
bind
(source, routing_key=None, arguments=None)[source]¶ Bind the queue to the specified exchange or routing key.
Parameters: Returns: bool
-
consume_messages
(no_ack=False, prefetch=None, priority=None)[source]¶ Consume messages from the queue as a generator:
You can use this message instead of the queue object as an iterator if you need to alter the prefect count, set the consumer priority or consume in no_ack mode.
Parameters: Return type: Iterator
-
consumer
(*args, **kwds)[source]¶ Consumer message context manager, returns a consumer message generator.
Parameters: Return type: Consumer
-
declare
(passive=False)[source]¶ Declare the queue on the RabbitMQ channel passed into the constructor, returning the current message count for the queue and its consumer count as a tuple.
Parameters: passive (bool) – Passive declare to retrieve message count and consumer count information Returns: Message count, Consumer count Return type: tuple(int, int)
-
get
(acknowledge=True)[source]¶ Request a single message from RabbitMQ using the Basic.Get AMQP command.
Parameters: acknowledge (bool) – Let RabbitMQ know if you will manually acknowledge or negatively acknowledge the message after each get. Return type: rabbitpy.message.Message or None
-
ha_declare
(nodes=None)[source]¶ Declare a the queue as highly available, passing in a list of nodes the queue should live on. If no nodes are passed, the queue will be declared across all nodes in the cluster.
Parameters: nodes (list) – A list of nodes to declare. If left empty, queue will be declared on all cluster nodes. Returns: Message count, Consumer count Return type: tuple(int, int)
-
stop_consuming
()[source]¶ Stop consuming messages. This is usually invoked if you want to cancel your consumer from outside the context manager or generator.
If you invoke this, there is a possibility that the generator method will return None instead of a
rabbitpy.Message
.
-
unbind
(source, routing_key=None)[source]¶ Unbind queue from the specified exchange where it is bound the routing key. If routing key is None, use the queue name.
Parameters: - source (str or
rabbitpy.exchange.Exchange
exchange) – The exchange to unbind from - routing_key (str) – The routing key that binds them
- source (str or
- channel (