View Source Rabbit.Topology behaviour (Rabbit v0.20.0)

A RabbitMQ topology process.

This is a blocking process that can be used to declare exchanges, queues, and bindings. Basically - performing any RabbitMQ setup required by your application. It should be added to your supervision tree before any producers or consumers.

Both Rabbit.Consumer and Rabbit.ConsumerSupervisor have the handle_setup/1 callback, which can be used to perform any queue, exchange or binding work as well. But if you have more complex requirements, this module can be used.

Example

# This is a connection
defmodule MyConnection do
  use Rabbit.Connection

  def start_link(opts \ []) do
    Rabbit.Connection.start_link(__MODULE__, opts, name: __MODULE__)
  end

  # Callbacks

  @impl Rabbit.Connection
  def init(:connection, opts) do
    # Perform any runtime configuration
    {:ok, opts}
  end
end

# This is a topology
defmodule MyTopology do
  use Rabbit.Topology

  def start_link(opts \ []) do
    Rabbit.Topology.start_link(__MODULE__, opts, name: __MODULE__)
  end

  # Callbacks

  @impl Rabbit.Topology
  def init(_type, opts) do
    # Perform any runtime configuration
    {:ok, opts}
  end
end

# Start the connection
MyConnection.start_link()

# Start the topology
MyTopology.start_link(
  connection: MyConnection,
  exchanges: [
    [name: "my_exchange_1"],
    [name: "my_exchange_2", type: :fanout],
  ],
  queues: [
    [name: "my_queue_1"],
    [name: "my_queue_2", durable: true],
  ],
  bindings: [
    [type: :queue, source: "my_exchange_1", destination: "my_queue_1"],
    [type: :exchange, source: "my_exchange_2", destination: "my_exchange_1"],
  ]
)

Summary

Callbacks

A callback executed when the topology is started.

Types

@type binding() :: [
  type: :queue | :exchange,
  source: binary(),
  destination: binary(),
  routing_key: binary(),
  nowait: boolean(),
  arguments: list()
]
@type exchange() :: [
  name: binary(),
  type: :direct | :fanout | :topic | :match | :headers,
  durable: boolean(),
  auto_delete: boolean(),
  internal: boolean(),
  passive: boolean(),
  nowait: boolean(),
  arguments: list()
]
@type option() ::
  {:connection, Rabbit.Connection.t()}
  | {:retry_backoff, non_neg_integer()}
  | {:retry_max, non_neg_integer()}
  | {:queues, [queue()]}
  | {:exchanges, [exchange()]}
  | {:bindings, [binding()]}
@type options() :: [option()]
@type queue() :: [
  name: binary(),
  durable: boolean(),
  auto_delete: boolean(),
  exclusive: boolean(),
  passive: boolean(),
  nowait: boolean(),
  arguments: list()
]
@type t() :: GenServer.name()

Callbacks

@callback init(:topology, options()) :: {:ok, options()} | :ignore

A callback executed when the topology is started.

Returning {:ok, opts} - where opts is a keyword list of option/0 will cause start_link/3 to return {:ok, pid} and the process to enter its loop.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop

Functions

Link to this function

start_link(module, opts \\ [], server_opts \\ [])

View Source
@spec start_link(module(), list(), GenServer.options()) :: Supervisor.on_start()

Starts a toplogy process.

Options

  • :connection - A Rabbit.Connection process.
  • :exchanges - A list of exchanges to declare. Please see Exchanges.
  • :queues - A list of queues to declare. Please see Queues.
  • :bindings - A list of bindings to declare. Please see Bindings.
  • :retry_delay - The amount of time in milliseconds to delay between attempts to fetch a connection from the connection process - defaults to 100.
  • :retry_max - The max amount of connection retries that will be attempted before returning an error - defaults to 25.

Exchanges

Declaring exchanges is done by providing a list of keyword options. The options include:

  • :name - The name of the exchange.
  • :type - The type of the exchange - one of :direct, :fanout, :topic, :match or :headers - defaults to :direct. Custom types can also be provided.
  • :durable - Whether the exchange is durable across broker restarts - defaults to false.
  • :auto_delete - Deletes the exchange once all queues unbind from it - defaults to false.
  • :passive - Returns an error if the exchange does not already exist - defaults to false.
  • :internal - If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications - defaults to false.

Below is an example of exchange options:

[
  [name: "my_exchange_1"],
  [name: "my_exchange_2", type: :fanout, durable: true],
]

Queues

Declaring queues is done by providing a list of keyword options. The options include:

  • :name - The name of the queue.
  • :durable - Whether the queue is durable across broker restarts - defaults to false.
  • :auto_delete - Deletes the queue once all consumers disconnect - defaults to false.
  • :passive - Returns an error if the queue does not already exist - defaults to false.
  • :exclusive - If set, only one consumer can consume from the queue - defaults to false.

Below is an example of queue options:

[
  [name: "my_queue_1"],
  [name: "my_queue_2", durable: true],
]

Bindings

Declaring bindings is done by providing a list of keyword options. The options include:

  • :type - The type of the destination - one of :exchange or :queue.
  • :source - The source of the binding.
  • :destination - The destination of the binding.
  • :routing_key - The routing key of the binding.

Below is an example of binding options:

[
  [type: :queue, source: "my_exchange_1", destination: "my_queue_1"],
  [type: :exchange, source: "my_exchange_2", destination: "my_exchange_1"]
]

Server Options

You can also provide server options - which are simply the same ones available for GenServer.options/0.