View Source Rabbit.Broker behaviour (Rabbit v0.20.0)
A RabbitMQ broker process.
The broker is a Supervisor
that encapsulates all the smaller pieces available
in Rabbit into a single entity. It provides the following:
- Durable connection pooling through
Rabbit.Connection
. - Automatic broker configuration through
Rabbit.Topology
. - Simple message publishing through
Rabbit.Producer
. - Simple message consumption through
Rabbit.ConsumerSupervisor
.
It's recommended that the documentation for each of these components is read when utilizing the broker process.
Be aware that the connection for each component of the broker is automatically configured to use the connection process created by the broker. You dont need to configure this yourself.
Example
defmodule MyBroker do
use Rabbit.Broker
def start_link(opts \ []) do
Rabbit.Broker.start_link(__MODULE__, opts, name: __MODULE__)
end
# Callbacks
@impl Rabbit.Broker
def init(_type, opts) do
# Perform any configuration
# You can implement the callback for each component of the broker based
# on the first arg.
{:ok, opts}
end
@impl Rabbit.Broker
def handle_message(message) do
# Handle consumed messages
{:ack, message}
end
@impl Rabbit.Broker
def handle_error(message) do
# Handle errors that occur within handle_message/1
{:nack, message}
end
end
# Start the broker
MyBroker.start_link(
connection: [uri: "amqp://guest:guest@127.0.0.1:5672"],
topology: [
queues: [[name: "foo"], [name: "bar"]]
],
producer: [pool_size: 10],
consumers: [[queue: "foo"], [queue: "bar", prefetch_count: 10]]
)
Summary
Callbacks
A callback executed by each consumer to handle message exceptions.
A callback executed by each consumer to handle message consumption.
A callback executed by each component of the broker.
Functions
Publishes a message using the provided broker.
Starts a broker process.
Stops a broker process.
Types
@type option() :: {:connection, Rabbit.Connection.options()} | {:topology, Rabbit.Topology.options()} | {:producer, Rabbit.Producer.options()} | {:consumers, Rabbit.ConsumerSupervisor.consumers()}
@type options() :: [option()]
@type t() :: module()
Callbacks
@callback handle_error(message :: Rabbit.Message.t()) :: Rabbit.Consumer.message_response()
A callback executed by each consumer to handle message exceptions.
Please see Rabbit.Consumer.handle_error/1
for more information.
@callback handle_message(message :: Rabbit.Message.t()) :: Rabbit.Consumer.message_response()
A callback executed by each consumer to handle message consumption.
Please see Rabbit.Consumer.handle_message/1
for more information.
@callback init( :connection_pool | :connection | :topology | :producer_pool | :producer | :consumer_supervisor | :consumer, keyword() ) :: {:ok, keyword()} | :ignore
A callback executed by each component of the broker.
Seven versions of the callback can be created. The callback is differentiated based on the first arg.
:connection_pool
- Callback for the connection pool.:connection
- Callback for each connection in the pool.:topology
- Callback for the topology.:producer_pool
- Callback for the producer pool.:producer
- Callback for each producer in the pool.:consumer_supervisor
- Callback for the consumer supervisor.:consumer
- Callback for each consumer.# Initialize the connection pool def init(:connection_pool, opts) do {:ok, opts} end # Initialize a single connection def init(:connection, opts) do {:ok, opts} end # Initialize the topology def init(:topology, opts) do {:ok, opts} end # And so on....
Returning {:ok, opts}
- where opts
is a keyword list will cause start_link/3
to return {:ok, pid}
and the broker to enter its loop.
Returning :ignore
will cause start_link/3
to return :ignore
and the process
will exit normally without entering the loop.
Functions
publish(module, exchange, routing_key, payload, opts \\ [], timeout \\ 5000)
View Source@spec publish( t(), Rabbit.Producer.exchange(), Rabbit.Producer.routing_key(), Rabbit.Producer.message(), Rabbit.Producer.publish_options(), timeout() ) :: :ok | {:error, any()}
Publishes a message using the provided broker.
The broker MUST be a broker module, and not a broker PID.
Please see the Rabbit.Producer.publish/6
documentation for further details.
@spec start_link(module(), options(), GenServer.options()) :: Supervisor.on_start()
Starts a broker process.
Options
:connection
- A keyword list ofRabbit.Connection.option/0
.:topology
- A keyword list ofRabbit.Topology.option/0
.:producer
- A keyword list ofRabbit.Producer.option/0
.:consumers
- A list ofRabbit.ConsumerSupervisor.consumers/0
.
Server Options
You can also provide server options - which are simply the same ones available
for GenServer.options/0
.
@spec stop(t()) :: :ok
Stops a broker process.