retort v2.7.0 Retort.Connection View Source

Caches the AMQP.Connection while it remains alive, so that multiple TCP connections aren’t made to the AMQP broker, following best practices.

Multiple channels should be opened on the same connection instead of multiple connections

iex> {:ok, connection} = Retort.Connection.await
iex> first_channel = AMQP.Channel.open(connection)
iex> second_channel = AMQP.Channel.open(connection)
iex> first_channel != second_channel
true

Link to this section Summary

Types

t()

connection - the active connection to RabbitMQ. nil when wait for initial connection or reconnect waiter_from_by_monitor_reference - tracks the list of from for calls to await by the Process.monitor ref used to monitor the waiter pid in from. This is empty if there is an active connection when await is called; otherwise, it tracks all the processes that need to be notified (with notify_waiters/1) when a connection is finally established. Entries are removed if the waiter dies, which is why its keyed by monitor reference

Functions

Waits for a connection, unlike get/0, which returns immediately if there is no connection. Unlike get, which has default timeout of twice the get_connection_timeout, await’s default timeout is :infinity

Gets cached connection if it is still alive. Otherwise, returns {:error, :disconnected}. If you want to wait for a connection, use await/1

The current backoff time (in milliseconds) for when connect/2 fails to open an connection

The current connection timeout for AMQP.Connection.open/1

The current URL to connect to RabbitMQ

Removes %AMQP.Connection{} from state when its AMPQ.Connection.pid goes down and tell Connection behaviour to reconnect

Initializes state to the given conenction

Updates the backoff for the next time one connect/2 fails

Updates the connection timeout for AMQP.Connection.open/1

Updates the URL to connect to RabbitMQ

Starts the connection cache

Link to this section Types

Link to this type t() View Source
t() :: %Retort.Connection{connection: %AMQP.Connection{pid: term()} | nil, waiter_from_by_monitor_reference: %{optional(reference()) => GenServer.from()}}

connection - the active connection to RabbitMQ. nil when wait for initial connection or reconnect waiter_from_by_monitor_reference - tracks the list of from for calls to await by the Process.monitor ref used to monitor the waiter pid in from. This is empty if there is an active connection when await is called; otherwise, it tracks all the processes that need to be notified (with notify_waiters/1) when a connection is finally established. Entries are removed if the waiter dies, which is why its keyed by monitor reference.

Link to this section Functions

Link to this function await(timeout \\ :infinity) View Source
await(timeout()) :: {:ok, %AMQP.Connection{pid: term()}} | no_return()

Waits for a connection, unlike get/0, which returns immediately if there is no connection. Unlike get, which has default timeout of twice the get_connection_timeout, await’s default timeout is :infinity.

Link to this function connect(info, state) View Source
connect(:init | :reconnect, %Retort.Connection{connection: nil, waiter_from_by_monitor_reference: term()}) ::
  {:ok, %Retort.Connection{connection: %AMQP.Connection{pid: term()}, waiter_from_by_monitor_reference: %{}}} |
  {:backoff, pos_integer(), %Retort.Connection{connection: nil, waiter_from_by_monitor_reference: map()}}
Link to this function get() View Source
get() ::
  {:ok, %AMQP.Connection{pid: term()}} |
  {:error, :disconnected} |
  no_return()

Gets cached connection if it is still alive. Otherwise, returns {:error, :disconnected}. If you want to wait for a connection, use await/1

Gets the same %AMQP.Connection{} while it is alive

iex> {:ok, original_connection} = Retort.Connection.get()
iex> {:ok, original_connection} == Retort.Connection.get()
true

Returns

  • {:ok, %AMQP.Connection{}} - connection is already established (or was established during the get/1 timeout before the GenServer saw the get call)
  • {:error, :disconnected} - connection is not established. Use await/0 to wait until the a connection is available.
Link to this function get(timeout) View Source
get(timeout()) ::
  {:ok, %AMQP.Connection{pid: term()}} |
  {:error, :disconnected} |
  no_return()
Link to this function get_backoff() View Source
get_backoff() :: pos_integer()

The current backoff time (in milliseconds) for when connect/2 fails to open an connection

Link to this function get_connection_timeout() View Source

The current connection timeout for AMQP.Connection.open/1

The current URL to connect to RabbitMQ

Link to this function handle_info(info, state) View Source
handle_info({:DOWN, reference(), :process, pid(), term()} | term(), t()) ::
  {:connect, :reconnect, t()} |
  {:noreply, t()}

Removes %AMQP.Connection{} from state when its AMPQ.Connection.pid goes down and tell Connection behaviour to reconnect.

Link to this function init(args) View Source
init(nil) :: {:connect, :init, %Retort.Connection{connection: nil, waiter_from_by_monitor_reference: term()}}
init(%AMQP.Connection{pid: term()}) :: {:ok, %Retort.Connection{connection: %AMQP.Connection{pid: term()}, waiter_from_by_monitor_reference: term()}}

Initializes state to the given conenction.

Link to this function put_backoff(backoff) View Source
put_backoff(pos_integer()) :: :ok

Updates the backoff for the next time one connect/2 fails.

Link to this function put_connection_timeout(timeout) View Source
put_connection_timeout(timeout()) :: :ok

Updates the connection timeout for AMQP.Connection.open/1

Link to this function put_url(url) View Source
put_url(String.t()) :: :ok

Updates the URL to connect to RabbitMQ

Link to this function start_link(connection, gen_server_opts \\ []) View Source
start_link(%AMQP.Connection{pid: term()} | nil, GenServer.options()) :: GenServer.on_start()

Starts the connection cache.