View Source Rabbit.Broker behaviour (Rabbit v0.20.0)

A RabbitMQ broker process.

The broker is a Supervisor that encapsulates all the smaller pieces available in Rabbit into a single entity. It provides the following:

It's recommended that the documentation for each of these components is read when utilizing the broker process.

Be aware that the connection for each component of the broker is automatically configured to use the connection process created by the broker. You dont need to configure this yourself.

Example

defmodule MyBroker do
  use Rabbit.Broker

  def start_link(opts \ []) do
    Rabbit.Broker.start_link(__MODULE__, opts, name: __MODULE__)
  end

  # Callbacks

  @impl Rabbit.Broker
  def init(_type, opts) do
    # Perform any configuration
    # You can implement the callback for each component of the broker based
    # on the first arg.
    {:ok, opts}
  end

  @impl Rabbit.Broker
  def handle_message(message) do
    # Handle consumed messages
    {:ack, message}
  end

  @impl Rabbit.Broker
  def handle_error(message) do
    # Handle errors that occur within handle_message/1
    {:nack, message}
  end
end

# Start the broker
MyBroker.start_link(
  connection: [uri: "amqp://guest:guest@127.0.0.1:5672"],
  topology: [
    queues: [[name: "foo"], [name: "bar"]]
  ],
  producer: [pool_size: 10],
  consumers: [[queue: "foo"], [queue: "bar", prefetch_count: 10]]
)

Summary

Callbacks

A callback executed by each consumer to handle message exceptions.

A callback executed by each consumer to handle message consumption.

A callback executed by each component of the broker.

Functions

Stops a broker process.

Types

@type option() ::
  {:connection, Rabbit.Connection.options()}
  | {:topology, Rabbit.Topology.options()}
  | {:producer, Rabbit.Producer.options()}
  | {:consumers, Rabbit.ConsumerSupervisor.consumers()}
@type options() :: [option()]
@type t() :: module()

Callbacks

@callback handle_error(message :: Rabbit.Message.t()) ::
  Rabbit.Consumer.message_response()

A callback executed by each consumer to handle message exceptions.

Please see Rabbit.Consumer.handle_error/1 for more information.

@callback handle_message(message :: Rabbit.Message.t()) ::
  Rabbit.Consumer.message_response()

A callback executed by each consumer to handle message consumption.

Please see Rabbit.Consumer.handle_message/1 for more information.

@callback init(
  :connection_pool
  | :connection
  | :topology
  | :producer_pool
  | :producer
  | :consumer_supervisor
  | :consumer,
  keyword()
) :: {:ok, keyword()} | :ignore

A callback executed by each component of the broker.

Seven versions of the callback can be created. The callback is differentiated based on the first arg.

  • :connection_pool - Callback for the connection pool.

  • :connection - Callback for each connection in the pool.

  • :topology - Callback for the topology.

  • :producer_pool - Callback for the producer pool.

  • :producer - Callback for each producer in the pool.

  • :consumer_supervisor - Callback for the consumer supervisor.

  • :consumer - Callback for each consumer.

    # Initialize the connection pool
    def init(:connection_pool, opts) do
      {:ok, opts}
    end
    
    # Initialize a single connection
    def init(:connection, opts) do
      {:ok, opts}
    end
    
    # Initialize the topology
    def init(:topology, opts) do
      {:ok, opts}
    end
    
    # And so on....

Returning {:ok, opts} - where opts is a keyword list will cause start_link/3 to return {:ok, pid} and the broker to enter its loop.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop.

Functions

Link to this function

publish(module, exchange, routing_key, payload, opts \\ [], timeout \\ 5000)

View Source

Publishes a message using the provided broker.

The broker MUST be a broker module, and not a broker PID.

Please see the Rabbit.Producer.publish/6 documentation for further details.

Link to this function

start_link(module, opts \\ [], server_opts \\ [])

View Source
@spec start_link(module(), options(), GenServer.options()) :: Supervisor.on_start()

Starts a broker process.

Options

Server Options

You can also provide server options - which are simply the same ones available for GenServer.options/0.

@spec stop(t()) :: :ok

Stops a broker process.