GenRMQ
GenRMQ is a set of behaviours meant to be used to create RabbitMQ consumers and publishers. Internally it is using AMQP elixir RabbitMQ client. The idea is to reduce boilerplate consumer / publisher code, which usually includes:
- creating connection / channel and keeping it in a state
- creating and binding queue
- handling reconnections / consumer cancellations
The project currently provides the following functionality:
GenRMQ.Consumer
- a behaviour for implementing RabbitMQ consumersGenRMQ.Publisher
- a behaviour for implementing RabbitMQ publishersGenRMQ.Processor
- a behaviour for implementing RabbitMQ message processorsGenRMQ.RabbitCase
- test utilities for RabbitMQ (example usage)
Installation
def deps do
[{:gen_rmq, "~> 1.3.0"}]
end
Migrations
Please check how to migrate to gen_rmq 1.0.0
from previous versions.
Examples
More thorough examples for using GenRMQ.Consumer
and GenRMQ.Publisher
can be found in the examples directory.
Consumer
defmodule Consumer do
@behaviour GenRMQ.Consumer
def init() do
[
queue: "gen_rmq_in_queue",
exchange: "gen_rmq_exchange",
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
retry_delay_function: fn attempt -> :timer.sleep(2000 * attempt) end
]
end
def consumer_tag() do
"test_tag"
end
def handle_message(message) do
...
end
end
GenRMQ.Consumer.start_link(Consumer, name: Consumer)
This will result in:
- durable
gen_rmq_exchange.deadletter
exchange created or redeclared - durable
gen_rmq_in_queue_error
queue created or redeclared. It will be bound togen_rmq_exchange.deadletter
- durable topic
gen_rmq_exchange
exchange created or redeclared - durable
gen_rmq_in_queue
queue created or redeclared. It will be bound togen_rmq_exchange
exchange and has a deadletter exchange set togen_rmq_exchange.deadletter
- every
handle_message
callback will executed in separate process. This can be disabled by settingconcurrency: false
ininit
callback - on failed rabbitmq connection it will wait for a bit and then reconnect
Optionally, you can:
- specify queue ttl with
queue_ttl
attribute - disable deadletter setup by setting
deadletter
attribute tofalse
- define custom names for deadletter queue / exchange / routing key by specifying
deadletter_queue
/deadletter_exchange
/deadletter_routing_key
attributes - create a priority queue with
queue_max_priority
attribute
For all available options please check consumer documentation.
Publisher
defmodule Publisher do
@behaviour GenRMQ.Publisher
def init() do
[
exchange: "gen_rmq_exchange",
uri: "amqp://guest:guest@localhost:5672"
]
end
end
GenRMQ.Publisher.start_link(Publisher, name: Publisher)
GenRMQ.Publisher.publish(Publisher, Jason.encode!(%{msg: "msg"}))
Running tests
You need docker-compose installed.
$ make test
How to contribute
We happily accept contributions in the form of Github PRs or in the form of bug reports, comments/suggestions or usage questions by creating a github issue.
Notes on project maturity
This library was developed as a Meltwater internal project starting in January 2018. Over the next two months it has been used in at least three Meltwater production services.
License
The MIT License (MIT)
Copyright (c) 2018 Meltwater Inc. http://underthehood.meltwater.com/