RMQ - RabbitMQ tools.
A set of handy tools for working with RabbitMQ in Elixir projects.
Based on AMQP library.
It includes:
Installation
The package can be installed by adding rmq to your list of dependencies in mix.exs:
def deps do
[
{:rmq, "~> 0.3.0"}
]
endRMQ.Connection
A GenServer which provides a robust connection to the RabbitMQ server.
Usage
defmodule MyApp.RabbitConnection do
use RMQ.Connection,
otp_app: :my_app,
uri: "amqp://localhost",
name: to_string(__MODULE__)
endMeant to be started under the application's supervision tree as follows:
defmodule MyApp.Application do
use Application
def start(_type, _args) do
children = [
MyApp.RabbitConnection
# ...
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
end
Options
:uri- an AMQP URI. Defaults to"amqp://localhost";:reconnect_interval- a reconnect interval in milliseconds. It can be also a function that accepts the current connection attempt as a number and returns a new interval. Defaults to5000;- other options for
AMQP.Connection.open/2.
RMQ.Consumer
RabbitMQ Consumer.
Usage
defmodule MyApp.Consumer do
use RMQ.Consumer,
queue: "my-app-consumer-queue",
exchange: {"my-exchange", :direct, durable: true}
@impl RMQ.Consumer
def consume(chan, payload, meta) do
# do something with the payload
ack(chan, meta.delivery_tag)
end
end
# or with dynamic configuration
defmodule MyApp.Consumer2 do
use RMQ.Consumer
@impl RMQ.Consumer
def config do
[
queue: System.fetch_env!("QUEUE_NAME"),
reconnect_interval: fn attempt -> attempt * 1000 end,
]
end
@impl RMQ.Consumer
def consume(chan, payload, meta) do
# do something with the payload
ack(chan, meta.delivery_tag)
end
endOptions
:connection- the connection module which implementsRMQ.Connectionbehaviour. Defaults toRMQ.Connection;:queue- the name of the queue to consume. Will be created if does not exist. Also can be a tuple{queue, options}. See the options forAMQP.Queue.declare/3;:exchange- the name of the exchange to whichqueueshould be bound. Also can be a tuple{type, exchange, options}. See the options forAMQP.Exchange.declare/4. Defaults to"";:routing_key- queue binding key. Defaults toqueue; Will be created if does not exist. Defaults to"";:dead_letter- defines if the consumer should setup deadletter exchange and queue. Defaults totrue;:dead_letter_queue- the name of dead letter queue. Also can be a tuple{queue, options}. See the options forAMQP.Queue.declare/3. Defaults to"#{queue}_error".;:dead_letter_exchange- the name of the exchange to whichdead_letter_queueshould be bound. Also can be a tuple{type, exchange}or{type, exchange, options}. See the options forAMQP.Exchange.declare/4. Defaults to"#{exchange}.dead-letter";:dead_letter_routing_key- routing key for dead letter messages. Defaults toqueue;:concurrency- defines ifc:consume/3callback should be called in a separate process. Defaults totrue;:prefetch_count- sets the message prefetch count. Defaults to10;:consumer_tag- consumer tag. Defaults to a current module name;:reconnect_interval- a reconnect interval in milliseconds. It can be also a function that accepts the current connection attempt as a number and returns a new interval. Defaults to5000;
RMQ.RPC
RPC via RabbitMQ.
Usage
# Application 1:
defmodule MyApp.RemoteResource do
use RMQ.RPC, publishing_options: [app_id: "MyApp"]
def find_by_id(id) do
call("remote-resource-finder", %{id: id})
end
end
# Application 2:
defmodule MyOtherApp.Consumer do
use RMQ.Consumer, queue: "remote-resource-finder"
@impl RMQ.Consumer
def consume(chan, payload, meta) do
response =
payload
|> Jason.decode!()
|> Map.fetch!("id")
|> MyOtherApp.Resource.get()
|> Jason.encode!()
reply(chan, meta, response)
ack(chan, meta)
end
endOptions
:connection- the connection module which implementsRMQ.Connectionbehaviour;:queue- the queue name to which the module will be subscribed for consuming responses. Also can be a tuple{queue, options}. See the options forAMQP.Queue.declare/3. Defaults to""which means the broker will assign a name to a newly created queue by itself;:exchange- the exchange name to which:queuewill be bound. Please make sure the exchange exist. Defaults to""- the default exchange;:consumer_tag- a consumer tag for:queue. Defaults to the current module name;:publishing_options- any valid options forAMQP.Basic.publish/5exceptreply_to,correlation_id- these will be set automatically and cannot be changed. Defaults to[];:reconnect_interval- a reconnect interval in milliseconds. It can be also a function that accepts the current connection attempt as a number and returns a new interval. Defaults to5000;:filter_parameters- a list of parameters that may contain sensitive data and have to be filtered out when logging. Defaults to["password"].