View Source Rabbit.ConsumerSupervisor behaviour (Rabbit v0.20.0)
A RabbitMQ consumer supervisor process.
This allows starting and supervising multiple Rabbit.Consumer
processes with
ease. Rather than creating a module for each consumer and implementing repetitive
logic - the same callbacks are used across all child consumers.
Example
# This is a connection
defmodule MyConnection do
use Rabbit.Connection
def start_link(opts \\ []) do
Rabbit.Connection.start_link(__MODULE__, opts, name: __MODULE__)
end
# Callbacks
@impl Rabbit.Connection
def init(:connection, opts) do
# Perform any runtime configuration
{:ok, opts}
end
end
# This is a consumer supervisor
defmodule MyConsumers do
use Rabbit.ConsumerSupervisor
def start_link(consumers \\ []) do
Rabbit.ConsumerSupervisor.start_link(__MODULE__, consumers, name: __MODULE__)
end
# Callbacks
@impl Rabbit.ConsumerSupervisor
def init(:consumer_supervisor, _consumers) do
# Perform any runtime configuration for the supervisor
consumers = [
[connection: MyConnection, queue: "my_queue1", prefetch_count: 5],
[connection: MyConnection, queue: "my_queue2", prefetch_count: 10]
]
{:ok, consumers}
end
def init(:consumer, opts) do
# Perform any runtime configuration per consumer
{:ok, opts}
end
@impl Rabbit.ConsumerSupervisor
def handle_setup(channel, queue) do
# Perform exchange or queue setup per consumer
AMQP.Queue.declare(channel, queue)
:ok
end
@impl Rabbit.ConsumerSupervisor
def handle_message(message) do
# Handle messages per consumer
{:ack, message}
end
@impl Rabbit.ConsumerSupervisor
def handle_error(message) do
# Handle errors that occur per consumer
{:nack, message}
end
end
# Start the connection
MyConnection.start_link()
# Start the consumers
MyConsumers.start_link()
Summary
Callbacks
A callback executed by each consumer to handle message exceptions.
A callback executed by each consumer to handle message consumption.
A callback executed by each consumer after the channel is open, but before consumption.
A callback executed by each component of the consumer supervisor.
Functions
Starts a consumer supervisor process.
Types
@type consumers() :: [Rabbit.Consumer.options()]
@type t() :: Supervisor.name()
Callbacks
@callback handle_error(message :: Rabbit.Message.t()) :: Rabbit.Consumer.message_response()
A callback executed by each consumer to handle message exceptions.
Please see Rabbit.Consumer.handle_error/1
for more information.
@callback handle_message(message :: Rabbit.Message.t()) :: Rabbit.Consumer.message_response()
A callback executed by each consumer to handle message consumption.
Please see Rabbit.Consumer.handle_message/1
for more information.
A callback executed by each consumer after the channel is open, but before consumption.
Please see Rabbit.Consumer.handle_setup/1
for more information.
@callback init( type :: :consumer_supervisor | :consumer, opts :: consumers() | Rabbit.Consumer.options() ) :: {:ok, consumers() | Rabbit.Consumer.options()} | :ignore
A callback executed by each component of the consumer supervisor.
Two versions of the callback must be created. One for the supervisor, and one for the consumers. The first argument differentiates the callback.
# Initialize the supervisor
def init(:consumer_supervisor, consumers) do
{:ok, consumers}
end
# Initialize a single consumer
def init(:consumer, opts) do
{:ok, opts}
end
Returning {:ok, consumers}
- where consumers
is a list of Rabbit.Consumer.options/0
will, cause start_link/3
to return {:ok, pid}
and the supervisor to enter its loop.
Returning {:ok, opts}
- where opts
is a keyword list of Rabbit.Consumer.option/0
will,
cause start_link/3
to return {:ok, pid}
and the consumer to enter its loop.
Returning :ignore
will cause start_link/3
to return :ignore
and the process
will exit normally without entering the loop.
Functions
Starts a consumer supervisor process.
The list of consumers should represent a list of Rabbit.Consumer.options/0
.
Server Options
You can also provide server options - which are simply the same ones available
for GenServer.options/0
.