View Source Rabbit.Consumer behaviour (Rabbit v0.20.0)
A RabbitMQ consumer process.
Consumers are the "workers" of your application. They wrap around the standard
AMQP.Channel
and provide the following benefits:
- Durability during connection and channel failures through use of exponential backoff.
- Easy runtime setup through the
init/2
andhandle_setup/1
callbacks. - Automatic acknowledgements based on the return value of the
handle_message/1
callback. - Ability to handle exceptions through the
handle_error/1
callback. - Each message is executed within its own supervised task.
- Automatic payload decoding based on available serializers and message content type.
Example
# This is a connection
defmodule MyConnection do
use Rabbit.Connection
def start_link(opts \\ []) do
Rabbit.Connection.start_link(__MODULE__, opts, name: __MODULE__)
end
# Callbacks
@impl Rabbit.Connection
def init(:connection, opts) do
# Perform any runtime configuration
{:ok, opts}
end
end
# This is a consumer
defmodule MyConsumer do
use Rabbit.Consumer
def start_link(opts \\ []) do
Rabbit.Consumer.start_link(__MODULE__, opts, name: __MODULE__)
end
# Callbacks
@impl Rabbit.Consumer
def init(_type, opts) do
# Perform any runtime configuration
{:ok, opts}
end
@impl Rabbit.Consumer
def handle_setup(state) do
# Optional callback to perform exchange or queue setup
AMQP.Queue.declare(state.channel, state.queue)
:ok
end
@impl Rabbit.Consumer
def handle_message(message) do
# Handle consumed messages
{:ack, message}
end
@impl Rabbit.Consumer
def handle_error(message) do
# Handle errors that occur within handle_message/1
{:nack, message}
end
end
# Start the connection
MyConnection.start_link()
# Start the consumer
MyConsumer.start_link(connection: MyConnection, queue: "my_queue", prefetch_count: 20)
Serializers
When a message is consumed, its content type is compared to the list of available serializers. If a serializer matches the content type, the message will be automatically decoded.
You can find out more about serializers at Rabbit.Serializer
.
Summary
Callbacks
A callback executed to handle message exceptions.
A callback executed to handle message consumption.
An optional callback executed after the channel is open, but before consumption.
A callback executed when the consumer is started.
Types
@type delivery_tag() :: non_neg_integer()
@type message_response() :: {:ack, Rabbit.Message.t()} | {:ack, Rabbit.Message.t(), action_options()} | {:nack, Rabbit.Message.t()} | {:nack, Rabbit.Message.t(), action_options()} | {:reject, Rabbit.Message.t()} | {:reject, Rabbit.Message.t(), action_options()} | any()
@type option() :: {:connection, Rabbit.Connection.t()} | {:queue, String.t()} | {:prefetch_count, non_neg_integer()} | {:prefetch_size, non_neg_integer()} | {:consumer_tag, String.t()} | {:no_local, boolean()} | {:no_ack, boolean()} | {:exclusive, boolean()} | {:nowait, boolean()} | {:arguments, Keyword.t()} | {:custom_meta, map()} | {:setup_opts, setup_options()} | {:workers, non_neg_integer()} | {:timeout, timeout()}
@type options() :: [option()]
@type setup_options() :: keyword()
@type t() :: GenServer.name()
Callbacks
@callback handle_error(message :: Rabbit.Message.t()) :: message_response()
A callback executed to handle message exceptions.
If the original handle_message/1
callback raises an error, this callback
will be called with the message - but with the :error_reason
and :error_stack
fields filled.
You may choose to return the same values as handle_message/1
.
@callback handle_message(message :: Rabbit.Message.t()) :: message_response()
A callback executed to handle message consumption.
The callback is provided a Rabbit.Message
struct. You may find more information
about the message structure within its own documentation. The message may be
automatically decoded based on the content type and available serializers.
You may choose to ack, nack, or reject the message based on the return value.
{:ack, message}
- will acknowledge the message.{:ack, message, options}
- will acknowledge the message with options.{:nack, message}
- will negative acknowledge the message.{:nack, message, options}
- will negative acknowledge the message with options.{:reject, message}
- will reject the message.{:reject, message, options}
- will reject the message with options.
If you don't return one of these values - nothing will be done. This means you
will need to manually ack, nack or reject the message if required. Please
see the Rabbit.Message
module for more information.
An optional callback executed after the channel is open, but before consumption.
The callback is called with the current state, containing the open channel and queue name if given. At the most basic, you may want to declare the queue to ensure it's available. This will be entirely application dependent though.
def handle_setup(state) do
AMQP.Queue.declare(state.channel, state.queue)
:ok
end
Important keys from the state include:
:connection
- theRabbit.Connection
module in use.:channel
- theAMQP.Channel
open for this consumer.:queue
- the queue name.:setup_opts
- as provided tostart_link/3
.
Return either :ok
or {:ok, new_state}
for success, the latter will update the state.
If another value is returned it will be marked as failed, and the consumer will attempt to go through the connection setup process again.
Alternatively, you could use a Rabbit.Topology
process to perform this
setup work. Please see its docs for more information.
A callback executed when the consumer is started.
Returning {:ok, opts}
- where opts
is a keyword list of t:option()
will,
cause start_link/3
to return {:ok, pid}
and the process to enter its loop.
Returning :ignore
will cause start_link/3
to return :ignore
and the process
will exit normally without entering the loop.
Functions
@spec start_link(module(), options(), GenServer.options()) :: Supervisor.on_start()
Starts a consumer process.
Options
:connection
- ARabbit.Connection
process.:queue
- The queue to consume messages from.:prefetch_count
- The basic unit of concurrency for a given consumer - defaults to0
.:prefetch_size
- The prefetch window size in octets - defaults to0
, meaning no specific limit.:consumer_tag
- The identifier of the consumer. If empty, one will be generated by the server.:no_local
- A boolean representing whether messages should not be sent to the same connection that published them - defaults tofalse
.:no_ack
- A boolean representing whether acknowledgements are not required for messages - defaults tofalse
.:exclusive
- A boolean representing whether only this consumer can access the queue - defaults tofalse
.:nowait
- A boolean representing whether the server should not respond to methods - defaults tofalse
.:arguments
- A set of arguments for the consumer.:custom_meta
- A map of custom data that will be included in eachRabbit.Message
handled by the consumer.:setup_opts
- A keyword list of custom options for use inhandle_setup/1
.:workers
- The number of workers available to process messages - defaults toSystem.schedulers_online()
.:timeout
- The timeout in milliseconds for each message processed - defaults to60_000
.
Server Options
You can also provide server options - which are simply the same ones available
for GenServer.options/0
.
@spec stop(t()) :: :ok
Stops a consumer process.