AMQPHelpers.Reliability.Consumer (AMQP Helpers v1.4.1)

A consumer for dealing with reliability scenarios.

A AMQPHelpers.Reliability.Consumer is process which manages a reliable consume operation over AMQP, where messages are acknowledge to the broker after processing them. Pair this process with a AMQPHelpers.Reliability.Producer to provide reliable message exchange.

Example

This Consumer delivers messages to a message_handler/0 which processes these messages. The Consumer enforces the usage of AMQP.Application, so after defining our application connection, channels and our message handler we can create an instance of this process using start_link/1 to start consuming messages:

alias AMQPHelpers.Reliability.Consumer

my_message_handler = fn payload, meta ->
  IO.inspect({payload, meta}, label: "Got a message!")
end

{:ok, consumer} = Consumer.start_link(
  channel_name: :my_channel_name,
  message_handler: :my_channel_name,
  queue_name: "my_queue_name"
)

Link to this section Summary

Types

The function that handle messages.

Option values used by start_link/1 function.

Options used by start_link/1 function.

Functions

Returns a specification to start this module under a supervisor.

Starts consuming messages.

Starts a Consumer process linked to the current process.

Link to this section Types

Link to this type

message_handler()

Specs

message_handler() :: function() | {module(), atom(), list()}

The function that handle messages.

A message handler is a function that will deal with the consumed messaged. It will receive the payload of the message as first argument, and the message metadata as second argument.

A module, function, arguments triplet can also be used. For example, if {Foo, :bar, [1, 2]} is used as message handler, the consumer will call Foo.bar(message, meta, 1, 2) to handle messages.

This function must return :ok if the messages was handled successfully, so the consumer will acknowledge the message. Any other return value will non-acknowledge the message.

Specs

option() ::
  GenServer.option()
  | {:adapter, module()}
  | {:channel, AMQP.Channel.t()}
  | {:channel_name, binary() | atom()}
  | {:consume_on_init, boolean()}
  | {:consume_options, keyword()}
  | {:message_handler, message_handler()}
  | {:prefetch_count, non_neg_integer()}
  | {:prefetch_size, non_neg_integer()}
  | {:queue_name, binary()}
  | {:requeue, boolean()}
  | {:retry_interval, non_neg_integer()}
  | {:shutdown_gracefully, boolean()}
  | {:task_supervisor, Supervisor.supervisor()}

Option values used by start_link/1 function.

Specs

options() :: [option()]

Options used by start_link/1 function.

Link to this section Functions

Link to this function

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

consume(server)

Specs

consume(GenServer.server()) :: :ok

Starts consuming messages.

This function is used to start consuming messages when consume_on_init option is set to false. Not required by default but useful for testing purposes.

Link to this function

start_link(opts \\ [])

Specs

start_link(options()) :: GenServer.on_start()

Starts a Consumer process linked to the current process.

Options

The following option can be given to Consumer when starting it. Note that message_handler and queue_name are required.

  • adapter - Sets the AMQPHelpers.Adapter. Defaults to AMQPHelpers.Adapters.AMQP.
  • channel - The channel to use to consume messages. NOTE: do not use this for production environments because this Consumer does not supervise the given channel. Instead, use channel_name which makes use of AMQP.Application.
  • channel_name - The name of the configured channel to use. See AMQP.Application for more information. Defaults to :default.
  • consume_on_init - If the consumer should start consuming messages on init or not. Defaults to true.
  • consume_options - The options given to AMQPHelpers.Adapter.consume/4.
  • message_handler - The function that will deal with messages. Required.
  • prefetch_count - The maximum number of unacknowledged messages in the channel. See AMQP.Basic.qos2 for more info.
  • prefetch_size - The maximum number of unacknowledged bytes in the channel. See AMQP.Basic.qos2 for more info.
  • queue_name - The name of the queue to consume. Required.
  • requeue - Whether to requeue messages or not after a consume error. Defaults to true.
  • retry_interval - The number of millisecond to wait if an error happens when trying to consume messages or when trying to open a channel.
  • shutdown_gracefully - If enabled, the consumer will cancel the subscription when terminating. Default to false but enforced if consumer_options has exclusive set to true.
  • task_supervisor - The Task.Supervisor which runs message handling tasks. If not provided, the Consumer will handle messages synchronously.

GenServer.options/0 are also available. See GenServer.start_link/2 for more information about these.