AMQPHelpers.Reliability.Consumer (AMQP Helpers v1.4.1)
A consumer for dealing with reliability scenarios.
A AMQPHelpers.Reliability.Consumer
is process which manages a reliable
consume operation over AMQP, where messages are acknowledge to the broker
after processing them. Pair this process with a
AMQPHelpers.Reliability.Producer
to provide reliable message exchange.
Example
This Consumer
delivers messages to a message_handler/0
which processes
these messages. The Consumer
enforces the usage of AMQP.Application
, so
after defining our application connection, channels and our message handler
we can create an instance of this process using start_link/1
to start
consuming messages:
alias AMQPHelpers.Reliability.Consumer
my_message_handler = fn payload, meta ->
IO.inspect({payload, meta}, label: "Got a message!")
end
{:ok, consumer} = Consumer.start_link(
channel_name: :my_channel_name,
message_handler: :my_channel_name,
queue_name: "my_queue_name"
)
Link to this section Summary
Types
The function that handle messages.
Option values used by start_link/1
function.
Options used by start_link/1
function.
Functions
Returns a specification to start this module under a supervisor.
Starts consuming messages.
Starts a Consumer
process linked to the current process.
Link to this section Types
message_handler()
Specs
The function that handle messages.
A message handler is a function that will deal with the consumed messaged. It will receive the payload of the message as first argument, and the message metadata as second argument.
A module, function, arguments triplet can also be used. For example, if
{Foo, :bar, [1, 2]}
is used as message handler, the consumer will call
Foo.bar(message, meta, 1, 2)
to handle messages.
This function must return :ok
if the messages was handled successfully, so
the consumer will acknowledge the message. Any other return value will
non-acknowledge the message.
option()
Specs
option() :: GenServer.option() | {:adapter, module()} | {:channel, AMQP.Channel.t()} | {:channel_name, binary() | atom()} | {:consume_on_init, boolean()} | {:consume_options, keyword()} | {:message_handler, message_handler()} | {:prefetch_count, non_neg_integer()} | {:prefetch_size, non_neg_integer()} | {:queue_name, binary()} | {:requeue, boolean()} | {:retry_interval, non_neg_integer()} | {:shutdown_gracefully, boolean()} | {:task_supervisor, Supervisor.supervisor()}
Option values used by start_link/1
function.
options()
Specs
options() :: [option()]
Options used by start_link/1
function.
Link to this section Functions
child_spec(init_arg)
Returns a specification to start this module under a supervisor.
See Supervisor
.
consume(server)
Specs
consume(GenServer.server()) :: :ok
Starts consuming messages.
This function is used to start consuming messages when consume_on_init
option is set to false. Not required by default but useful for testing
purposes.
start_link(opts \\ [])
Specs
start_link(options()) :: GenServer.on_start()
Starts a Consumer
process linked to the current process.
Options
The following option can be given to Consumer
when starting it. Note that
message_handler
and queue_name
are required.
adapter
- Sets theAMQPHelpers.Adapter
. Defaults toAMQPHelpers.Adapters.AMQP
.channel
- The channel to use to consume messages. NOTE: do not use this for production environments because this Consumer does not supervise the given channel. Instead, usechannel_name
which makes use ofAMQP.Application
.channel_name
- The name of the configured channel to use. SeeAMQP.Application
for more information. Defaults to:default
.consume_on_init
- If the consumer should start consuming messages on init or not. Defaults totrue
.consume_options
- The options given toAMQPHelpers.Adapter.consume/4
.message_handler
- The function that will deal with messages. Required.prefetch_count
- The maximum number of unacknowledged messages in the channel. SeeAMQP.Basic.qos2
for more info.prefetch_size
- The maximum number of unacknowledged bytes in the channel. SeeAMQP.Basic.qos2
for more info.queue_name
- The name of the queue to consume. Required.requeue
- Whether to requeue messages or not after a consume error. Defaults totrue
.retry_interval
- The number of millisecond to wait if an error happens when trying to consume messages or when trying to open a channel.shutdown_gracefully
- If enabled, the consumer will cancel the subscription when terminating. Default tofalse
but enforced ifconsumer_options
hasexclusive
set totrue
.task_supervisor
- TheTask.Supervisor
which runs message handling tasks. If not provided, theConsumer
will handle messages synchronously.
GenServer.options/0
are also available. See GenServer.start_link/2
for
more information about these.