Build Status Hex Version Coverage Status Hex.pm Download Total Dependabot Status

GenRMQ

GenRMQ is a set of behaviours meant to be used to create RabbitMQ consumers and publishers, as fully OTP compliant GenServers.

Internally it is using the AMQP elixir RabbitMQ client. The idea is to reduce boilerplate consumer / publisher code, which usually includes:

  • creating connection / channel and keeping it in a state
  • creating and binding queue
  • handling reconnections / consumer cancellations

GenRMQ provides the following functionality:

Installation

GenRMG requires Elixir 1.11 or newer, running on at least OTP 22.

def deps do
  [{:gen_rmq, "~> 4.0"}]
end

Migrations

Version 4.0.0 has been released. Please check how to migrate to gen_rmq 4.0.0.

Examples

More thorough examples for using GenRMQ.Consumer, GenRMQ.Publisher, and GenRMQ.Processor can be found under documentation.

Consumer

defmodule Consumer do
  @behaviour GenRMQ.Consumer

  @impl GenRMQ.Consumer
  def init() do
    [
      queue: "gen_rmq_in_queue",
      exchange: "gen_rmq_exchange",
      routing_key: "#",
      prefetch_count: "10",
      connection: "amqp://guest:guest@localhost:5672",
      retry_delay_function: fn attempt -> :timer.sleep(2000 * attempt) end
    ]
  end

  @impl GenRMQ.Consumer
  def consumer_tag() do
    "test_tag"
  end

  @impl GenRMQ.Consumer
  def handle_message(message) do
    ...
  end

  @impl GenRMQ.Consumer
  def handle_error(message, _reason) do
    GenRMQ.Consumer.reject(message, true)
  end
end
GenRMQ.Consumer.start_link(Consumer, name: Consumer)

This will result in:

  • durable gen_rmq_exchange.deadletter exchange created or redeclared
  • durable gen_rmq_in_queue_error queue created or redeclared. It will be bound to gen_rmq_exchange.deadletter
  • durable topic gen_rmq_exchange exchange created or redeclared
  • durable gen_rmq_in_queue queue created or redeclared. It will be bound to gen_rmq_exchange exchange and has a deadletter exchange set to gen_rmq_exchange.deadletter
  • every handle_message callback will be executed in a separate supervised Task. This can be disabled by setting concurrency: false in init callback
  • on failed rabbitmq connection it will wait for a bit and then reconnect

There are many options to control the consumer setup details, please check the GenRMQ.Consumer.init/0 docs for all available settings.

Publisher

defmodule Publisher do
  @behaviour GenRMQ.Publisher

  def init() do
    [
      exchange: "gen_rmq_exchange",
      connection: "amqp://guest:guest@localhost:5672"
    ]
  end
end
GenRMQ.Publisher.start_link(Publisher, name: Publisher)
GenRMQ.Publisher.publish(Publisher, Jason.encode!(%{msg: "msg"}))

Documentation

Examples

Guides

Metrics

Migrations

Running Tests

You need docker-compose installed.

$ make test

How to Contribute

Please see our Contribution Guidelines.

Are you using GenRMQ in Production? Please let us know, we are curious to learn about your experiences!

Maintainers

The maintainers are responsible for the general project oversight, and empowering further trusted committers (see below).

The maintainers are the ones that create new releases of GenRMQ.

Trusted Committers

Trusted Committers are members of our community who we have explicitly added to our GitHub repository. Trusted Committers have elevated rights, allowing them to send in changes directly to branches and to approve Pull Requests. For details see TRUSTED-COMMITTERS.md.

Note: Maintainers and Trusted Committers are listed in .github/CODEOWNERS in order to automatically assign PR reviews to them.

License

The MIT License.

Copyright (c) Meltwater Inc. underthehood.meltwater.com