rabbit_mq v0.0.19 RabbitMQ.Consumer behaviour
This module can be use
d to start and maintain a pool of Consumer workers.
Example usage
rabbit_mq
allows you to design consistent, SDK-like Consumers.
ℹ️ The following example assumes that the "customer/customer.updated"
queue already exists.
First, define your Consumer(s).
To consume off "customer/customer.created"
:
defmodule RabbitSample.CustomerCreatedConsumer do
use RabbitMQ.Consumer, queue: "customer/customer.created", worker_count: 2, prefetch_count: 3
require Logger
def handle_message(payload, meta, channel) do
Logger.info("Customer #{payload} created.")
ack(channel, meta.delivery_tag)
end
end
To consume off "customer/customer.updated"
:
defmodule RabbitSample.CustomerUpdatedConsumer do
use RabbitMQ.Consumer, queue: "customer/customer.updated", worker_count: 2, prefetch_count: 6
require Logger
def handle_message(payload, meta, channel) do
Logger.info("Customer updated. Data: #{payload}.")
ack(channel, meta.delivery_tag)
end
end
Then, start as normal under your existing supervision tree:
children = [
RabbitSample.Topology,
RabbitSample.CustomerProducer,
RabbitSample.CustomerCreatedConsumer,
RabbitSample.CustomerUpdatedConsumer
]
opts = [strategy: :one_for_one, name: RabbitSample.Supervisor]
Supervisor.start_link(children, opts)
As messages are published onto the "customer/customer.created"
, or "customer/customer.updated"
queues,
the corresponding handle_message/3
will be invoked.
⚠️ Please note that automatic message acknowledgement is disabled in rabbit_mq
, therefore
it's your responsibility to ensure messages are ack
'd or nack
'd.
ℹ️ Please consult the Consumer Acknowledgement Modes and Data Safety Considerations for more details.
Configuration
The following options can be used with RabbitMQ.Consumer
;
:prefetch_count
; limits the number of unacknowledged messages on a channel. Please consult the Consumer Prefetch section for more details. Defaults to10
.:queue
; the name of the queue from which the Consumer should start consuming. For exclusive queues, please see the Exclusive Queues section further below. Required.:worker_count
; number of workers to be spawned. Cannot be greater than:max_channels_per_connection
set in config. Defaults to3
.
When you use RabbitMQ.Consumer
, a few things happen;
- The module turns into a
GenServer
. - The server starts and supervises the desired number of workers.
handle_message/3
is passed as a callback to each worker whenstart_link/1
is called.- If an exclusive queue is requested, it will be declared and bound to the Consumer.
- Each worker starts consuming from the queue provided, calls
handle_message/3
for each message consumed. ack/2
,ack/3
,nack/2
,nack/3
,reject/2
, andreject/3
become automatically available.
handle_message/3
needs to be defined with the following signature;
@type payload :: String.t()
@type meta :: map()
@type channel :: AMQP.Channel.t()
@type result :: term()
@callback handle_message(payload(), meta(), channel()) :: result()
Exclusive queues
If you want to consume from an exclusive queue, simply use one of the following tuples in the configuration;
{exchange, routing_key}
{exchange, routing_key, opts}
{exchange, routing_key, queue_name}
{exchange, routing_key, queue_name, opts}
ℹ️ At a minimum, the exchange
and the routing_key
are required.
defmodule CustomerConsumer do
use RabbitMQ.Consumer, queue: {"customer", "customer.*"}, worker_count: 3
# Define `handle_message/3` as normal.
end
This will ensure that the queue is declared and correctly bound before the Consumer workers start consuming messages off it.
Link to this section Summary
Functions
Returns a specification to start this module under a supervisor.
Starts this module as a process via GenServer.start_link/3
.
Invoked when the server is about to exit. It should do any cleanup required. See https://hexdocs.pm/elixir/GenServer.html#c:terminate/2 for more details.
Link to this section Types
Link to this section Functions
child_spec(init_arg)
Returns a specification to start this module under a supervisor.
See Supervisor
.
Starts this module as a process via GenServer.start_link/3
.
Only used by the module's child_spec
.
terminate(reason, state)
Invoked when the server is about to exit. It should do any cleanup required. See https://hexdocs.pm/elixir/GenServer.html#c:terminate/2 for more details.