View Source Rabbit.Connection behaviour (Rabbit v0.20.0)

A RabbitMQ connection pool process.

Connections form the basis of any application that is working with RabbitMQ. A connection module is needed by all the other modules included with Rabbit. They wrap around the standard AMQP.Connection and provide the following benefits:

  • Durability during connection failures through use of exponential backoff.
  • Increased throughput via connection pooling.
  • Subscriptions that assist connection status monitoring.
  • Easy runtime setup through an init/2 callback.

Example

# Connection module
defmodule MyConnection do
  use Rabbit.Connection

  def start_link(opts \\ []) do
    Rabbit.Connection.start_link(__MODULE__, opts, name: __MODULE__)
  end

  # Callbacks

  @impl Rabbit.Connection
  def init(_type, opts) do
    # Perform any runtime configuration
    {:ok, opts}
  end
end

# Start the connection
MyConnection.start_link()

# Subscribe to the connection
Rabbit.Connection.subscribe(MyConnection)

receive do
  {:connected, connection} -> "hello"
end

# Stop the connection
Rabbit.Connection.stop(MyConnection)

receive do
  {:disconnected, reason} -> "bye"
end

Summary

Callbacks

A callback executed by each component of the connection pool.

Functions

Checks whether a connection is alive within the pool.

Fetches a raw AMQP.Connection struct from the pool.

Starts a connection pool process.

Stops the connection pool.

Subscribes a process to a connection in the pool.

Runs the given function inside a transaction.

Types

@type option() ::
  {:uri, String.t()}
  | {:pool_size, non_neg_integer()}
  | {:max_overflow, non_neg_integer()}
  | {:strategy, :lifo | :fifo}
  | {:name, String.t()}
  | {:username, String.t()}
  | {:password, String.t()}
  | {:virtual_host, String.t()}
  | {:host, String.t()}
  | {:port, integer()}
  | {:channel_max, integer()}
  | {:frame_max, integer()}
  | {:heartbeat, integer()}
  | {:connection_timeout, integer()}
  | {:ssl_options, atom() | Keyword.t()}
  | {:socket_options, Keyword.t()}
  | {:retry_backoff, non_neg_integer()}
  | {:retry_max_delay, non_neg_integer()}
@type options() :: [option()]
@type t() :: GenServer.name()

Callbacks

@callback init(:connection_pool | :connection, options()) :: {:ok, options()} | :ignore

A callback executed by each component of the connection pool.

Two versions of the callback must be created. One for the pool, and one for the connections. The first argument differentiates the callback.

  # Initialize the pool
  def init(:connection_pool, opts) do
    {:ok, opts}
  end

  # Initialize a single connection
  def init(:connection, opts) do
    {:ok, opts}
  end

Returning {:ok, opts} - where opts is a keyword list of option/0 will, cause start_link/3 to return {:ok, pid} and the process to enter its loop.

Returning :ignore will cause start_link/3 to return :ignore and the process will exit normally without entering the loop.

Functions

Link to this function

alive?(connection_pool, timeout \\ 5000)

View Source
@spec alive?(t(), timeout()) :: boolean()

Checks whether a connection is alive within the pool.

Link to this function

fetch(connection_pool, timeout \\ 5000)

View Source
@spec fetch(t(), timeout()) :: {:ok, AMQP.Connection.t()} | {:error, :not_connected}

Fetches a raw AMQP.Connection struct from the pool.

Link to this function

start_link(module, opts \\ [], server_opts \\ [])

View Source
@spec start_link(module(), options(), GenServer.options()) :: GenServer.on_start()

Starts a connection pool process.

Options

  • :uri - The connection URI. This takes priority over other connection attributes.
  • :pool_size - The number of processes to create for connections - defaults to 1. Each process consumes a RabbitMQ connection.
  • :max_overflow - Maximum number of temporary workers created when the pool is empty - defaults to 0.
  • :stratgey - Determines whether checked in workers should be placed first or last in the line of available workers - defaults to :fifo.
  • :name - A name that will be displayed in the management UI.
  • :username - The name of a user registered with the broker - defaults to "guest".
  • :password - The password of user - defaults to "guest".
  • :virtual_host - The name of a virtual host in the broker - defaults to "/".
  • :host - The hostname of the broker - defaults to "localhost".
  • :port - The port the broker is listening on - defaults to 5672.
  • :channel_max - The channel_max handshake parameter - defaults to 0.
  • :frame_max - The frame_max handshake parameter - defaults to 0.
  • :heartbeat - The hearbeat interval in seconds - defaults to 10.
  • :connection_timeout - The connection timeout in milliseconds - defaults to 50000.
  • :retry_backoff - The amount of time in milliseconds to add between connection retry attempts - defaults to 1_000.
  • :retry_max_delay - The max amount of time in milliseconds to be used between connection attempts - defaults to 5_000.
  • :ssl_options - Enable SSL by setting the location to cert files - defaults to :none.
  • :client_properties - A list of extra client properties to be sent to the server - defaults to [].
  • :socket_options - Extra socket options. These are appended to the default options. See http://www.erlang.org/doc/man/inet.html#setopts-2 and http://www.erlang.org/doc/man/gen_tcp.html#connect-4 for descriptions of the available options.

Server Options

You can also provide server options - which are simply the same ones available for GenServer.options/0.

@spec stop(t()) :: :ok

Stops the connection pool.

Link to this function

subscribe(connection_pool, subscriber \\ nil, timeout \\ 5000)

View Source
@spec subscribe(t(), pid() | nil, timeout()) :: :ok

Subscribes a process to a connection in the pool.

A subscribed process can receive the following messages:

{:connected, connection} - where connection is an AMQP.Connection struct.

During the subscription process, if the connection is alive, this message will immediately be sent. If the connection goes down, and manages to reconnect, this message will be sent.

{:disconnected, reason} - where reason can be any value.

If the connection goes down, all subscribing processes are sent this message. The connection process will then go through an exponential backoff period until connection is achieved again.

Link to this function

transaction(connection_pool, fun)

View Source
@spec transaction(t(), (t() -> any())) :: any()

Runs the given function inside a transaction.

The function must accept a connection pid.