GenRMQ
GenRMQ is a set of behaviours meant to be used to create RabbitMQ consumers and publishers. Internally it is using AMQP elixir RabbitMQ client. The idea is to reduce boilerplate consumer / publisher code, which usually includes:
- creating connection / channel and keeping it in a state
- creating and binding queue
- handling reconnections / consumer cancellations
The project currently provides the following functionality:
GenRMQ.Consumer- a behaviour for implementing RabbitMQ consumersGenRMQ.Publisher- a behaviour for implementing RabbitMQ publishersGenRMQ.Processor- a behaviour for implementing RabbitMQ message processorsGenRMQ.RabbitCase- test utilities for RabbitMQ (example usage)
Installation
def deps do
[{:gen_rmq, "~> 1.3.0"}]
end
Migrations
Please check how to migrate to gen_rmq 1.0.0 from previous versions.
Examples
More thorough examples for using GenRMQ.Consumer and GenRMQ.Publisher can be found in the examples directory.
Consumer
defmodule Consumer do
@behaviour GenRMQ.Consumer
def init() do
[
queue: "gen_rmq_in_queue",
exchange: "gen_rmq_exchange",
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
retry_delay_function: fn attempt -> :timer.sleep(2000 * attempt) end
]
end
def consumer_tag() do
"test_tag"
end
def handle_message(message) do
...
end
end
GenRMQ.Consumer.start_link(Consumer, name: Consumer)
This will result in:
- durable
gen_rmq_exchange.deadletterexchange created or redeclared - durable
gen_rmq_in_queue_errorqueue created or redeclared. It will be bound togen_rmq_exchange.deadletter - durable topic
gen_rmq_exchangeexchange created or redeclared - durable
gen_rmq_in_queuequeue created or redeclared. It will be bound togen_rmq_exchangeexchange and has a deadletter exchange set togen_rmq_exchange.deadletter - every
handle_messagecallback will executed in separate process. This can be disabled by settingconcurrency: falseininitcallback - on failed rabbitmq connection it will wait for a bit and then reconnect
Optionally, you can:
- specify queue ttl with
queue_ttlattribute - disable deadletter setup by setting
deadletterattribute tofalse - define custom names for deadletter queue / exchange / routing key by specifying
deadletter_queue/deadletter_exchange/deadletter_routing_keyattributes - create a priority queue with
queue_max_priorityattribute
For all available options please check consumer documentation.
Publisher
defmodule Publisher do
@behaviour GenRMQ.Publisher
def init() do
[
exchange: "gen_rmq_exchange",
uri: "amqp://guest:guest@localhost:5672"
]
end
end
GenRMQ.Publisher.start_link(Publisher, name: Publisher)
GenRMQ.Publisher.publish(Publisher, Jason.encode!(%{msg: "msg"}))
Running tests
You need docker-compose installed.
$ make test
How to contribute
We happily accept contributions in the form of Github PRs or in the form of bug reports, comments/suggestions or usage questions by creating a github issue.
Notes on project maturity
This library was developed as a Meltwater internal project starting in January 2018. Over the next two months it has been used in at least three Meltwater production services.
License
The MIT License (MIT)
Copyright (c) 2018 Meltwater Inc. http://underthehood.meltwater.com/