View Source Rabbit.Topology behaviour (Rabbit v0.21.0)
A RabbitMQ topology process.
This is a blocking process that can be used to declare exchanges, queues, and bindings. Basically - performing any RabbitMQ setup required by your application. It should be added to your supervision tree before any producers or consumers.
Both Rabbit.Consumer and Rabbit.ConsumerSupervisor have the handle_setup/1
callback, which can be used to perform any queue, exchange or binding work as
well. But if you have more complex requirements, this module can be used.
Example
# This is a connection
defmodule MyConnection do
use Rabbit.Connection
def start_link(opts \ []) do
Rabbit.Connection.start_link(__MODULE__, opts, name: __MODULE__)
end
# Callbacks
@impl Rabbit.Connection
def init(:connection, opts) do
# Perform any runtime configuration
{:ok, opts}
end
end
# This is a topology
defmodule MyTopology do
use Rabbit.Topology
def start_link(opts \ []) do
Rabbit.Topology.start_link(__MODULE__, opts, name: __MODULE__)
end
# Callbacks
@impl Rabbit.Topology
def init(_type, opts) do
# Perform any runtime configuration
{:ok, opts}
end
end
# Start the connection
MyConnection.start_link()
# Start the topology
MyTopology.start_link(
connection: MyConnection,
exchanges: [
[name: "my_exchange_1"],
[name: "my_exchange_2", type: :fanout],
],
queues: [
[name: "my_queue_1"],
[name: "my_queue_2", durable: true],
],
bindings: [
[type: :queue, source: "my_exchange_1", destination: "my_queue_1"],
[type: :exchange, source: "my_exchange_2", destination: "my_exchange_1"],
]
)
Summary
Callbacks
A callback executed when the topology is started.
Functions
Starts a toplogy process.
Types
@type option() :: {:connection, Rabbit.Connection.t()} | {:retry_backoff, non_neg_integer()} | {:retry_max, non_neg_integer()} | {:queues, [queue()]} | {:exchanges, [exchange()]} | {:bindings, [binding()]}
@type options() :: [option()]
@type t() :: GenServer.name()
Callbacks
A callback executed when the topology is started.
Returning {:ok, opts} - where opts is a keyword list of option/0 will
cause start_link/3 to return {:ok, pid} and the process to enter its loop.
Returning :ignore will cause start_link/3 to return :ignore and the process
will exit normally without entering the loop
Functions
@spec start_link(module(), list(), GenServer.options()) :: Supervisor.on_start()
Starts a toplogy process.
Options
:connection- ARabbit.Connectionprocess.:exchanges- A list of exchanges to declare. Please see Exchanges.:queues- A list of queues to declare. Please see Queues.:bindings- A list of bindings to declare. Please see Bindings.:retry_delay- The amount of time in milliseconds to delay between attempts to fetch a connection from the connection process - defaults to100.:retry_max- The max amount of connection retries that will be attempted before returning an error - defaults to25.
Exchanges
Declaring exchanges is done by providing a list of keyword options. The options include:
:name- The name of the exchange.:type- The type of the exchange - one of:direct,:fanout,:topic,:matchor:headers- defaults to:direct. Custom types can also be provided.:durable- Whether the exchange is durable across broker restarts - defaults tofalse.:auto_delete- Deletes the exchange once all queues unbind from it - defaults tofalse.:passive- Returns an error if the exchange does not already exist - defaults tofalse.:internal- If set, the exchange may not be used directly by publishers, but only when bound to other exchanges. Internal exchanges are used to construct wiring that is not visible to applications - defaults tofalse.
Below is an example of exchange options:
[
[name: "my_exchange_1"],
[name: "my_exchange_2", type: :fanout, durable: true],
]Queues
Declaring queues is done by providing a list of keyword options. The options include:
:name- The name of the queue.:durable- Whether the queue is durable across broker restarts - defaults tofalse.:auto_delete- Deletes the queue once all consumers disconnect - defaults tofalse.:passive- Returns an error if the queue does not already exist - defaults tofalse.:exclusive- If set, only one consumer can consume from the queue - defaults tofalse.
Below is an example of queue options:
[
[name: "my_queue_1"],
[name: "my_queue_2", durable: true],
]Bindings
Declaring bindings is done by providing a list of keyword options. The options include:
:type- The type of the destination - one of:exchangeor:queue.:source- The source of the binding.:destination- The destination of the binding.:routing_key- The routing key of the binding.
Below is an example of binding options:
[
[type: :queue, source: "my_exchange_1", destination: "my_queue_1"],
[type: :exchange, source: "my_exchange_2", destination: "my_exchange_1"]
]Server Options
You can also provide server options - which are simply the same ones available
for GenServer.options/0.