View Source Rabbit.Connection behaviour (Rabbit v0.20.0)
A RabbitMQ connection pool process.
Connections form the basis of any application that is working with RabbitMQ. A
connection module is needed by all the other modules included with Rabbit. They
wrap around the standard AMQP.Connection
and provide the following
benefits:
- Durability during connection failures through use of exponential backoff.
- Increased throughput via connection pooling.
- Subscriptions that assist connection status monitoring.
- Easy runtime setup through an
init/2
callback.
Example
# Connection module
defmodule MyConnection do
use Rabbit.Connection
def start_link(opts \\ []) do
Rabbit.Connection.start_link(__MODULE__, opts, name: __MODULE__)
end
# Callbacks
@impl Rabbit.Connection
def init(_type, opts) do
# Perform any runtime configuration
{:ok, opts}
end
end
# Start the connection
MyConnection.start_link()
# Subscribe to the connection
Rabbit.Connection.subscribe(MyConnection)
receive do
{:connected, connection} -> "hello"
end
# Stop the connection
Rabbit.Connection.stop(MyConnection)
receive do
{:disconnected, reason} -> "bye"
end
Summary
Callbacks
A callback executed by each component of the connection pool.
Functions
Checks whether a connection is alive within the pool.
Fetches a raw AMQP.Connection
struct from the pool.
Starts a connection pool process.
Stops the connection pool.
Subscribes a process to a connection in the pool.
Runs the given function inside a transaction.
Types
@type option() :: {:uri, String.t()} | {:pool_size, non_neg_integer()} | {:max_overflow, non_neg_integer()} | {:strategy, :lifo | :fifo} | {:name, String.t()} | {:username, String.t()} | {:password, String.t()} | {:virtual_host, String.t()} | {:host, String.t()} | {:port, integer()} | {:channel_max, integer()} | {:frame_max, integer()} | {:heartbeat, integer()} | {:connection_timeout, integer()} | {:ssl_options, atom() | Keyword.t()} | {:socket_options, Keyword.t()} | {:retry_backoff, non_neg_integer()} | {:retry_max_delay, non_neg_integer()}
@type options() :: [option()]
@type t() :: GenServer.name()
Callbacks
A callback executed by each component of the connection pool.
Two versions of the callback must be created. One for the pool, and one for the connections. The first argument differentiates the callback.
# Initialize the pool
def init(:connection_pool, opts) do
{:ok, opts}
end
# Initialize a single connection
def init(:connection, opts) do
{:ok, opts}
end
Returning {:ok, opts}
- where opts
is a keyword list of option/0
will,
cause start_link/3
to return {:ok, pid}
and the process to enter its loop.
Returning :ignore
will cause start_link/3
to return :ignore
and the process
will exit normally without entering the loop.
Functions
Checks whether a connection is alive within the pool.
@spec fetch(t(), timeout()) :: {:ok, AMQP.Connection.t()} | {:error, :not_connected}
Fetches a raw AMQP.Connection
struct from the pool.
@spec start_link(module(), options(), GenServer.options()) :: GenServer.on_start()
Starts a connection pool process.
Options
:uri
- The connection URI. This takes priority over other connection attributes.:pool_size
- The number of processes to create for connections - defaults to1
. Each process consumes a RabbitMQ connection.:max_overflow
- Maximum number of temporary workers created when the pool is empty - defaults to0
.:stratgey
- Determines whether checked in workers should be placed first or last in the line of available workers - defaults to:fifo
.:name
- A name that will be displayed in the management UI.:username
- The name of a user registered with the broker - defaults to"guest"
.:password
- The password of user - defaults to"guest"
.:virtual_host
- The name of a virtual host in the broker - defaults to"/"
.:host
- The hostname of the broker - defaults to"localhost"
.:port
- The port the broker is listening on - defaults to5672
.:channel_max
- The channel_max handshake parameter - defaults to0
.:frame_max
- The frame_max handshake parameter - defaults to0
.:heartbeat
- The hearbeat interval in seconds - defaults to10
.:connection_timeout
- The connection timeout in milliseconds - defaults to50000
.:retry_backoff
- The amount of time in milliseconds to add between connection retry attempts - defaults to1_000
.:retry_max_delay
- The max amount of time in milliseconds to be used between connection attempts - defaults to5_000
.:ssl_options
- Enable SSL by setting the location to cert files - defaults to:none
.:client_properties
- A list of extra client properties to be sent to the server - defaults to[]
.:socket_options
- Extra socket options. These are appended to the default options. See http://www.erlang.org/doc/man/inet.html#setopts-2 and http://www.erlang.org/doc/man/gen_tcp.html#connect-4 for descriptions of the available options.
Server Options
You can also provide server options - which are simply the same ones available
for GenServer.options/0
.
@spec stop(t()) :: :ok
Stops the connection pool.
Subscribes a process to a connection in the pool.
A subscribed process can receive the following messages:
{:connected, connection}
- where connection is an AMQP.Connection
struct.
During the subscription process, if the connection is alive, this message will immediately be sent. If the connection goes down, and manages to reconnect, this message will be sent.
{:disconnected, reason}
- where reason can be any value.
If the connection goes down, all subscribing processes are sent this message. The connection process will then go through an exponential backoff period until connection is achieved again.
Runs the given function inside a transaction.
The function must accept a connection pid.