View Source AMQP.Application (amqp v4.0.0)
Provides access to configured connections and channels.
Summary
Functions
Provides an easy way to access an AMQP channel.
Provides an easy way to access an AMQP connection.
Functions
@spec get_channel(binary() | atom()) :: {:ok, AMQP.Channel.t()} | {:error, any()}
Provides an easy way to access an AMQP channel.
AMQP.Application provides a wrapper on top of AMQP.Channel
with .
The channel will be monitored by AMQP's GenServer and it will automatically try to reopen when the channel is gone.
Usage
When you want to have a single channel in your app:
config :amqp,
connection: [url: "amqp://guest:guest@localhost:15672"],
channel: []
Then the channel will be open at the start of the application and you can access it via this function.
iex> {:ok, chan} = AMQP.Application.get_channel()
You can also set up multiple channels with :channels
key:
config :amqp,
connections: [
business_report: [url: "amqp://host1"],
analytics: [url: "amqp://host2"]
],
channels: [
business_report: [connection: :business_report],
analytics: [connection: :analytics]
]
Then you can access each channel with its name.
iex> {:ok, conn1} = AMQP.Application.get_channel(:business_report)
iex> {:ok, conn2} = AMQP.Application.get_channel(:analytics)
You can also have multiple channels for a single connection.
config :amqp,
connection: [],
channels: [
consumer: [],
producer: []
]
Configuration options
:connection
- The connection name configured withconnection
orconnections
(default:default
):retry_interval
- The retry interval in milliseconds when the channel is failed to open (default5000
)
Caveat
Although AMQP will reopen the named channel automatically when it is closed for some reasons, your application still needs to monitor the channel for a consumer process.
Be aware the channel reopened doesn't automatically recover the subscription of your consumer
Here is a sample GenServer module that monitors the channel and re-subscribe the channel.
defmodule AppConsumer do
use GenServer
@channel :default
@queue "myqueue"
....
def handle_info(:subscribe, state) do
case subscribe() do
{:ok, chan} ->
{:noreply, Map.put(state, :channel, chan)}
_ ->
{:noreply, state}
end
end
def handle_info({:DOWN, _, :process, pid, reason}, %{channel: %{pid: pid}} = state) do
send(self(), :subscribe)
{:noreply, Map.put(state, :channel, nil)}
end
defp subscribe() do
case AMQP.Application.get_channel(@channel) do
{:ok, chan} ->
Process.monitor(chan.pid)
{:ok, _consumer_tag} = AMQP.Basic.consume(@channel, @queue)
{:ok, chan}
_error ->
Process.send_after(self(), :subscribe, 1000)
{:error, :retrying}
end
end
end
@spec get_connection(binary() | atom()) :: {:ok, AMQP.Connection.t()} | {:error, any()}
Provides an easy way to access an AMQP connection.
The connection will be monitored by AMQP's GenServer and it will automatically try to reconnect when the connection is gone.
Usage
When you want to have a single connection in your app:
config :amqp, connection: [
url: "amqp://guest:guest@localhost:15672"
]
You can also use any options available on AMQP.Connection.open/2
:
config :amqp, connection: [
host: "localhost",
port: 15672
username: "guest",
password: "guest"
]
Then the connection will be open at the start of the application and you can access via this function.
iex> {:ok, conn} = AMQP.Application.get_connection()
By default, it tries to connect to your local RabbitMQ. You can simply pass the empty keyword list too:
config :amqp, connection: [] # == [url: "amqp://0.0.0.0"]
You can set up multiple connections with :connections
key:
config :amqp, connections: [
business_report: [
url: "amqp://host1"
],
analytics: [
url: "amqp://host2"
]
]
Then you can access each connection with its name.
iex> {:ok, conn1} = AMQP.Application.get_connection(:business_report)
iex> {:ok, conn2} = AMQP.Application.get_connection(:analytics)
The default name is :default so These two configurations are equivalent:
config :amqp, connection: []
config :amqp, connections: [default: []]
Configuration options
:retry_interval
- The retry interval in milliseconds when the connection is failed to open. (default5000
):url
- AMQP URI for the connection
See also AMQP.Connection.open/2
for all available options.