AMQPHelpers.Reliability.Consumer (AMQP Helpers v1.6.0)
View SourceA consumer for dealing with reliability scenarios.
A AMQPHelpers.Reliability.Consumer is process which manages a reliable
consume operation over AMQP, where messages are acknowledge to the broker
after processing them. Pair this process with a
AMQPHelpers.Reliability.Producer to provide reliable message exchange.
Example
This Consumer delivers messages to a message_handler/0 which processes
these messages. The Consumer enforces the usage of AMQP.Application, so
after defining our application connection, channels and our message handler
we can create an instance of this process using start_link/1 to start
consuming messages:
alias AMQPHelpers.Reliability.Consumer
my_message_handler = fn payload, meta ->
IO.inspect({payload, meta}, label: "Got a message!")
end
{:ok, consumer} = Consumer.start_link(
channel_name: :my_channel_name,
message_handler: :my_channel_name,
queue_name: "my_queue_name"
)
Summary
Types
The function that handle messages.
Option values used by start_link/1 function.
Options used by start_link/1 function.
Functions
Returns a specification to start this module under a supervisor.
Starts consuming messages.
Starts a Consumer process linked to the current process.
Types
The function that handle messages.
A message handler is a function that will deal with the consumed messaged. It will receive the payload of the message as first argument, and the message metadata as second argument.
A module, function, arguments triplet can also be used. For example, if
{Foo, :bar, [1, 2]} is used as message handler, the consumer will call
Foo.bar(message, meta, 1, 2) to handle messages.
This function must return :ok if the messages was handled successfully, so
the consumer will acknowledge the message. Any other return value will
non-acknowledge the message.
@type option() :: GenServer.option() | {:adapter, module()} | {:channel, AMQP.Channel.t()} | {:channel_name, binary() | atom()} | {:consume_on_init, boolean()} | {:consume_options, keyword()} | {:message_handler, message_handler()} | {:prefetch_count, non_neg_integer()} | {:prefetch_size, non_neg_integer()} | {:queue_name, binary()} | {:requeue, boolean()} | {:retry_interval, non_neg_integer()} | {:shutdown_gracefully, boolean()} | {:task_supervisor, Supervisor.supervisor()}
Option values used by start_link/1 function.
@type options() :: [option()]
Options used by start_link/1 function.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec consume(GenServer.server()) :: :ok
Starts consuming messages.
This function is used to start consuming messages when consume_on_init
option is set to false. Not required by default but useful for testing
purposes.
@spec start_link(options()) :: GenServer.on_start()
Starts a Consumer process linked to the current process.
Options
The following option can be given to Consumer when starting it. Note that
message_handler and queue_name are required.
adapter- Sets theAMQPHelpers.Adapter. Defaults toAMQPHelpers.Adapters.AMQP.channel- The channel to use to consume messages. NOTE: do not use this for production environments because this Consumer does not supervise the given channel. Instead, usechannel_namewhich makes use ofAMQP.Application.channel_name- The name of the configured channel to use. SeeAMQP.Applicationfor more information. Defaults to:default.consume_on_init- If the consumer should start consuming messages on init or not. Defaults totrue.consume_options- The options given toAMQPHelpers.Adapter.consume/4.message_handler- The function that will deal with messages. Required.prefetch_count- The maximum number of unacknowledged messages in the channel. SeeAMQP.Basic.qos2for more info.prefetch_size- The maximum number of unacknowledged bytes in the channel. SeeAMQP.Basic.qos2for more info.queue_name- The name of the queue to consume. Required.requeue- Whether to requeue messages or not after a consume error. Defaults totrue.retry_interval- The number of millisecond to wait if an error happens when trying to consume messages or when trying to open a channel.shutdown_gracefully- If enabled, the consumer will cancel the subscription when terminating. Default tofalsebut enforced ifconsumer_optionshasexclusiveset totrue.task_supervisor- TheTask.Supervisorwhich runs message handling tasks. If not provided, theConsumerwill handle messages synchronously.
GenServer.options/0 are also available. See GenServer.start_link/2 for
more information about these.