View Source Rabbit.ConsumerSupervisor behaviour (Rabbit v0.20.0)

A RabbitMQ consumer supervisor process.

This allows starting and supervising multiple Rabbit.Consumer processes with ease. Rather than creating a module for each consumer and implementing repetitive logic - the same callbacks are used across all child consumers.

Example

# This is a connection
defmodule MyConnection do
  use Rabbit.Connection

  def start_link(opts \\ []) do
    Rabbit.Connection.start_link(__MODULE__, opts, name: __MODULE__)
  end

  # Callbacks

  @impl Rabbit.Connection
  def init(:connection, opts) do
    # Perform any runtime configuration
    {:ok, opts}
  end
end

# This is a consumer supervisor
defmodule MyConsumers do
  use Rabbit.ConsumerSupervisor

  def start_link(consumers \\ []) do
    Rabbit.ConsumerSupervisor.start_link(__MODULE__, consumers, name: __MODULE__)
  end

  # Callbacks

  @impl Rabbit.ConsumerSupervisor
  def init(:consumer_supervisor, _consumers) do
    # Perform any runtime configuration for the supervisor
    consumers = [
      [connection: MyConnection, queue: "my_queue1", prefetch_count: 5],
      [connection: MyConnection, queue: "my_queue2", prefetch_count: 10]
    ]

    {:ok, consumers}
  end

  def init(:consumer, opts) do
    # Perform any runtime configuration per consumer
    {:ok, opts}
  end

  @impl Rabbit.ConsumerSupervisor
  def handle_setup(channel, queue) do
    # Perform exchange or queue setup per consumer
    AMQP.Queue.declare(channel, queue)

    :ok
  end

  @impl Rabbit.ConsumerSupervisor
  def handle_message(message) do
    # Handle messages per consumer
    {:ack, message}
  end

  @impl Rabbit.ConsumerSupervisor
  def handle_error(message) do
    # Handle errors that occur per consumer
    {:nack, message}
  end
end

# Start the connection
MyConnection.start_link()

# Start the consumers
MyConsumers.start_link()

Summary

Callbacks

A callback executed by each consumer to handle message exceptions.

A callback executed by each consumer to handle message consumption.

A callback executed by each consumer after the channel is open, but before consumption.

A callback executed by each component of the consumer supervisor.

Types

Callbacks

@callback handle_error(message :: Rabbit.Message.t()) ::
  Rabbit.Consumer.message_response()

A callback executed by each consumer to handle message exceptions.

Please see Rabbit.Consumer.handle_error/1 for more information.

@callback handle_message(message :: Rabbit.Message.t()) ::
  Rabbit.Consumer.message_response()

A callback executed by each consumer to handle message consumption.

Please see Rabbit.Consumer.handle_message/1 for more information.

Link to this callback

handle_setup(state)

View Source (optional)
@callback handle_setup(state :: map()) :: :ok | {:ok, new_state :: map()} | :error

A callback executed by each consumer after the channel is open, but before consumption.

Please see Rabbit.Consumer.handle_setup/1 for more information.

@callback init(
  type :: :consumer_supervisor | :consumer,
  opts :: consumers() | Rabbit.Consumer.options()
) :: {:ok, consumers() | Rabbit.Consumer.options()} | :ignore

A callback executed by each component of the consumer supervisor.

Two versions of the callback must be created. One for the supervisor, and one for the consumers. The first argument differentiates the callback.

  # Initialize the supervisor
  def init(:consumer_supervisor, consumers) do
    {:ok, consumers}
  end

  # Initialize a single consumer
  def init(:consumer, opts) do
    {:ok, opts}
  end

Returning {:ok, consumers} - where consumers is a list of Rabbit.Consumer.options/0 will, cause start_link/3 to return {:ok, pid} and the supervisor to enter its loop.

Returning {:ok, opts} - where opts is a keyword list of Rabbit.Consumer.option/0 will, cause start_link/3 to return {:ok, pid} and the consumer to enter its loop.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop.

Functions

Link to this function

start_link(module, consumers \\ [], server_opts \\ [])

View Source

Starts a consumer supervisor process.

The list of consumers should represent a list of Rabbit.Consumer.options/0.

Server Options

You can also provide server options - which are simply the same ones available for GenServer.options/0.