RMQ v0.2.0 RMQ.Consumer behaviour View Source
RabbitMQ Consumer.
Configuration
:connection- the connection module which implementsRMQ.Connectionbehaviour. Defaults toRMQ.Connection;:queue- the name of the queue to consume. Will be created if does not exist;:exchange- the name of the exchange to whichqueueshould be bound. Also accepts two-element tuple{type, name}. Defaults to"";:routing_key- queue binding key. Defaults toqueue; Will be created if does not exist. Defaults to"";:dead_letter- defines if the consumer should setup deadletter exchange and queue. Defaults totrue;:dead_letter_queue- the name of dead letter queue. Defaults to"#{queue}_error";:dead_letter_exchange- the name of the exchange to whichdead_letter_queueshould be bound. Also accepts two-element tuple{type, name}. Defaults to"#{exchange}.dead-letter";:dead_letter_routing_key- routing key for dead letter messages. Defaults toqueue;:concurrency- defines ifconsume/3callback should be called in a separate process. Defaults totrue;:prefetch_count- sets the message prefetch count. Defaults to10;:consumer_tag- consumer tag. Defaults to a current module name;:reconnect_interval- a reconnect interval in milliseconds. It can be also a function that accepts the current connection attempt as a number and returns a new interval. Defaults to5000;
Examples
defmodule MyApp.Consumer do
use RMQ.Consumer,
queue: "my-app-consumer-queue",
exchange: {:direct, "my-exchange"}
@impl RMQ.Consumer
def consume(chan, payload, meta) do
# do something with the payload
ack(chan, meta.delivery_tag)
end
end
defmodule MyApp.Consumer2 do
use RMQ.Consumer
@impl RMQ.Consumer
def config do
[
queue: System.fetch_env!("QUEUE_NAME"),
reconnect_interval: fn attempt -> attempt * 1000 end,
]
end
@impl RMQ.Consumer
def consume(chan, payload, meta) do
# do something with the payload
ack(chan, meta.delivery_tag)
end
end Link to this section Summary
Functions
The default implementation for setup_queue/2 callback.
Callbacks
A callback for dynamic configuration.
A callback for consuming a message.
Does all the job on preparing the queue.
Link to this section Functions
Specs
setup_queue(chan :: AMQP.Channel.t(), config :: keyword()) :: :ok
The default implementation for setup_queue/2 callback.
Link to this section Callbacks
Specs
config() :: keyword()
A callback for dynamic configuration.
Specs
consume(chan :: AMQP.Channel.t(), payload :: any(), meta :: map()) :: any()
A callback for consuming a message.
Keep in mind that the consumed message needs to be explicitly acknowledged via AMQP.Basic.ack/3
or rejected via AMQP.Basic.reject/3. For convenience, these functions
are imported and are available directly.
AMQP.Basic.publish/5 is imported as well which is convenient for the case
when the consumer implements RPC.
When :concurrency is true this function will be executed in the spawned process
using Kernel.spawn/1.
Specs
setup_queue(chan :: AMQP.Channel.t(), config :: keyword()) :: :ok
Does all the job on preparing the queue.
Whenever you need full control over configuring the queue you can implement this callback and
use AMQP library directly.
See setup_queue/2 for the default implementation.