Amqpx.Queue (amqpx v6.0.2)

Functions to operate on Queues.

Link to this section Summary

Functions

Binds a Queue to an Exchange.

Returns a number of active consumers on the queue.

Declares a queue. The optional queue parameter is used to set the name. If set to an empty string (default), the server will assign a name.

Deletes a Queue by name.

Returns true if queue is empty (has no messages ready), false otherwise.

Returns the number of messages that are ready for delivery (e.g. not pending acknowledgements) in the queue.

Discards all messages in the Queue.

Returns the message count and consumer count for the given queue. Uses declare/3 with the :passive option set.

Convenience to consume messages from a Queue.

Unbinds a Queue from an Exchange.

Stops the consumer identified by consumer_tag from consuming.

Link to this section Functions

Link to this function

bind(channel, queue, exchange, options \\ [])

Binds a Queue to an Exchange.

options

Options

  • :routing_key - The routing key used to bind the queue to the exchange. Defaults to "".
  • :no_wait - If true, the binding is not synchronous. Defaults to false.
  • :arguments - A list of arguments to pass when binding (of type Amqpx.arguments/0). See the README for more information. Defaults to [].
Link to this function

consumer_count(channel, queue)

@spec consumer_count(Amqpx.Channel.t(), Amqpx.Basic.queue()) ::
  integer() | no_return()

Returns a number of active consumers on the queue.

Link to this function

declare(channel, queue \\ "", options \\ [])

@spec declare(Amqpx.Channel.t(), Amqpx.Basic.queue(), keyword()) ::
  {:ok, map()} | Amqpx.Basic.error()

Declares a queue. The optional queue parameter is used to set the name. If set to an empty string (default), the server will assign a name.

Besides the queue name, the following options can be used:

options

Options

  • :durable - If set, keeps the Queue between restarts of the broker. Defaults to false.
  • :auto_delete - If set, deletes the Queue once all subscribers disconnect. Defaults to false.
  • :exclusive - If set, only one subscriber can consume from the Queue. Defaults to false.
  • :passive - If set, raises an error unless the queue already exists. Defaults to false.
  • :no_wait - If set, the declare operation is asynchronous. Defaults to false.
  • :arguments - A list of arguments to pass when declaring (of type Amqpx.arguments/0). See the README for more information. Defaults to [].
Link to this function

delete(channel, queue, options \\ [])

@spec delete(Amqpx.Channel.t(), Amqpx.Basic.queue(), keyword()) ::
  {:ok, map()} | Amqpx.Basic.error()

Deletes a Queue by name.

options

Options

  • :if_unused - If set, the server will only delete the queue if it has no consumers. If the queue has consumers, it's not deleted and an error is returned.
  • :if_empty - If set, the server will only delete the queue if it has no messages.
  • :no_wait - If set, the delete operation is asynchronous.
Link to this function

empty?(channel, queue)

@spec empty?(Amqpx.Channel.t(), Amqpx.Basic.queue()) :: boolean() | no_return()

Returns true if queue is empty (has no messages ready), false otherwise.

Link to this function

message_count(channel, queue)

@spec message_count(Amqpx.Channel.t(), Amqpx.Basic.queue()) :: integer() | no_return()

Returns the number of messages that are ready for delivery (e.g. not pending acknowledgements) in the queue.

Link to this function

purge(channel, queue)

@spec purge(Amqpx.Channel.t(), Amqpx.Basic.queue()) ::
  {:ok, map()} | Amqpx.Basic.error()

Discards all messages in the Queue.

Link to this function

status(chan, queue)

@spec status(Amqpx.Channel.t(), Amqpx.Basic.queue()) ::
  {:ok, map()} | Amqpx.Basic.error()

Returns the message count and consumer count for the given queue. Uses declare/3 with the :passive option set.

Link to this function

subscribe(channel, queue, fun)

@spec subscribe(Amqpx.Channel.t(), Amqpx.Basic.queue(), (String.t(), map() -> any())) ::
  {:ok, String.t()} | Amqpx.Basic.error()

Convenience to consume messages from a Queue.

The handler function must have arity 2 and will receive as arguments a binary with the message payload and a Map with the message properties.

The consumed message will be acknowledged after executing the handler function. If an exception is raised by the handler function, the message is rejected.

This convenience function will spawn a process and register it using Amqpx.Basic.consume.

Link to this function

unbind(channel, queue, exchange, options \\ [])

Unbinds a Queue from an Exchange.

options

Options

  • :routing_key - The routing queue for removing the binding. Defaults to "".
  • :arguments - A list of arguments to pass when unbinding (of type Amqpx.arguments/0). See the README for more information. Defaults to [].
Link to this function

unsubscribe(channel, consumer_tag)

@spec unsubscribe(Amqpx.Channel.t(), Amqpx.Basic.consumer_tag()) ::
  {:ok, String.t()} | Amqpx.Basic.error()

Stops the consumer identified by consumer_tag from consuming.

Internally just calls Amqpx.Basic.cancel/2.