rabbit_mq v0.0.19 RabbitMQ.Consumer behaviour

This module can be used to start and maintain a pool of Consumer workers.

Example usage

rabbit_mq allows you to design consistent, SDK-like Consumers.

ℹ️ The following example assumes that the "customer/customer.updated" queue already exists.

First, define your Consumer(s).

To consume off "customer/customer.created":

defmodule RabbitSample.CustomerCreatedConsumer do
  use RabbitMQ.Consumer, queue: "customer/customer.created", worker_count: 2, prefetch_count: 3

  require Logger

  def handle_message(payload, meta, channel) do
    Logger.info("Customer #{payload} created.")
    ack(channel, meta.delivery_tag)
  end
end

To consume off "customer/customer.updated":

defmodule RabbitSample.CustomerUpdatedConsumer do
  use RabbitMQ.Consumer, queue: "customer/customer.updated", worker_count: 2, prefetch_count: 6

  require Logger

  def handle_message(payload, meta, channel) do
    Logger.info("Customer updated. Data: #{payload}.")
    ack(channel, meta.delivery_tag)
  end
end

Then, start as normal under your existing supervision tree:

children = [
  RabbitSample.Topology,
  RabbitSample.CustomerProducer,
  RabbitSample.CustomerCreatedConsumer,
  RabbitSample.CustomerUpdatedConsumer
]

opts = [strategy: :one_for_one, name: RabbitSample.Supervisor]
Supervisor.start_link(children, opts)

As messages are published onto the "customer/customer.created", or "customer/customer.updated" queues, the corresponding handle_message/3 will be invoked.

⚠️ Please note that automatic message acknowledgement is disabled in rabbit_mq, therefore it's your responsibility to ensure messages are ack'd or nack'd.

ℹ️ Please consult the Consumer Acknowledgement Modes and Data Safety Considerations for more details.

Configuration

The following options can be used with RabbitMQ.Consumer;

  • :prefetch_count; limits the number of unacknowledged messages on a channel. Please consult the Consumer Prefetch section for more details. Defaults to 10.
  • :queue; the name of the queue from which the Consumer should start consuming. For exclusive queues, please see the Exclusive Queues section further below. Required.
  • :worker_count; number of workers to be spawned. Cannot be greater than :max_channels_per_connection set in config. Defaults to 3.

When you use RabbitMQ.Consumer, a few things happen;

  1. The module turns into a GenServer.
  2. The server starts and supervises the desired number of workers.
  3. handle_message/3 is passed as a callback to each worker when start_link/1 is called.
  4. If an exclusive queue is requested, it will be declared and bound to the Consumer.
  5. Each worker starts consuming from the queue provided, calls handle_message/3 for each message consumed.
  6. ack/2, ack/3, nack/2, nack/3, reject/2, and reject/3 become automatically available.

handle_message/3 needs to be defined with the following signature;

@type payload :: String.t()
@type meta :: map()
@type channel :: AMQP.Channel.t()
@type result :: term()

@callback handle_message(payload(), meta(), channel()) :: result()

Exclusive queues

If you want to consume from an exclusive queue, simply use one of the following tuples in the configuration;

  • {exchange, routing_key}
  • {exchange, routing_key, opts}
  • {exchange, routing_key, queue_name}
  • {exchange, routing_key, queue_name, opts}

ℹ️ At a minimum, the exchange and the routing_key are required.

defmodule CustomerConsumer do
  use RabbitMQ.Consumer, queue: {"customer", "customer.*"}, worker_count: 3

  # Define `handle_message/3` as normal.
end

This will ensure that the queue is declared and correctly bound before the Consumer workers start consuming messages off it.

Link to this section Summary

Functions

Returns a specification to start this module under a supervisor.

Starts this module as a process via GenServer.start_link/3.

Invoked when the server is about to exit. It should do any cleanup required. See https://hexdocs.pm/elixir/GenServer.html#c:terminate/2 for more details.

Link to this section Types

Link to this type

channel()

channel() :: AMQP.Channel.t()
Link to this type

meta()

meta() :: map()
Link to this type

payload()

payload() :: String.t()
Link to this type

result()

result() :: term()

Link to this section Functions

Link to this function

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

start_link(config, opts)

start_link(map(), keyword()) :: GenServer.on_start()

Starts this module as a process via GenServer.start_link/3.

Only used by the module's child_spec.

Link to this function

terminate(reason, state)

Invoked when the server is about to exit. It should do any cleanup required. See https://hexdocs.pm/elixir/GenServer.html#c:terminate/2 for more details.

Link to this section Callbacks

Link to this callback

handle_message(payload, meta, channel)

handle_message(payload(), meta(), channel()) :: result()