gen_rmq v3.0.0 GenRMQ.Consumer behaviour View Source

A behaviour module for implementing the RabbitMQ consumer.

It will:

  • setup RabbitMQ connection / channel and keep them in a state
  • create (if does not exist) a queue and bind it to an exchange
  • create deadletter queue and exchange
  • handle reconnections
  • call handle_message callback on every message delivery
  • call handle_error callback whenever handle_message fails to process or times out

Link to this section Summary


Acknowledges given message

Returns a specification to start this module under a supervisor.

Requeues / rejects given message

Starts GenRMQ.Consumer process with given callback module linked to the current process

Synchronously stops the consumer with a given reason


Invoked to provide consumer tag

Invoked when an error or timeout is encountered while executing handle_message callback

Invoked on message delivery

Invoked to provide consumer configuration

Link to this section Functions

Link to this function


View Source
  message :: %GenRMQ.Message{attributes: term(), payload: term(), state: term()}
) :: :ok

Acknowledges given message

message - GenRMQ.Message struct

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

reject(message, requeue \\ false)

View Source
  message :: %GenRMQ.Message{attributes: term(), payload: term(), state: term()},
  requeue :: boolean()
) :: :ok

Requeues / rejects given message

message - GenRMQ.Message struct

requeue - indicates if message should be requeued

Link to this function

start_link(module, options \\ [])

View Source
start_link(module :: module(), options :: Keyword.t()) ::
  {:ok, pid()} | {:error, term()}

Starts GenRMQ.Consumer process with given callback module linked to the current process

module - callback module implementing GenRMQ.Consumer behaviour


  • :name - used for name registration

Return values

If the consumer is successfully created and initialized, this function returns {:ok, pid}, where pid is the PID of the consumer. If a process with the specified consumer name already exists, this function returns {:error, {:already_started, pid}} with the PID of that process.


GenRMQ.Consumer.start_link(Consumer, name: :consumer)
Link to this function

stop(name, reason)

View Source
stop(name :: atom() | pid(), reason :: term()) :: :ok

Synchronously stops the consumer with a given reason

name - pid or name of the consumer to stop reason - reason of the termination


GenRMQ.Consumer.stop(:consumer, :normal)

Link to this section Callbacks

Link to this callback


View Source
consumer_tag() :: String.t()

Invoked to provide consumer tag


def consumer_tag() do
Link to this callback

handle_error(message, reason)

View Source
  message :: %GenRMQ.Message{attributes: term(), payload: term(), state: term()},
  reason :: atom() | {struct(), list()}
) :: :ok

Invoked when an error or timeout is encountered while executing handle_message callback

message - GenRMQ.Message struct reason - the information regarding the error


To reject the message that caused the Task to fail you can do something like so:

def handle_error(message, reason) do
  # Do something with message and reject it
  Logger.warn("Failed to process message: #{inspect(message)}")


The reason argument will either be the atom :killed if the Task timed out and needed to be stopped. Or it will be a 2 element tuple where the first element is the error stuct and the second element is the stacktrace:

  %RuntimeError{message: "Can't divide by zero!"},
    {TestConsumer.ErrorWithoutConcurrency, :handle_message, 1, [file: 'test/support/test_consumers.ex', line: 98]},
    {GenRMQ.Consumer, :handle_message, 2, [file: 'lib/consumer.ex', line: 519]},
    {GenRMQ.Consumer, :handle_info, 2, [file: 'lib/consumer.ex', line: 424]},
    {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]},
    {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]},
    {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}
Link to this callback


View Source
  message :: %GenRMQ.Message{attributes: term(), payload: term(), state: term()}
) :: :ok

Invoked on message delivery

message - GenRMQ.Message struct


def handle_message(message) do
  # Do something with message and acknowledge it
Link to this callback


View Source
init() :: [
  connection: keyword() | {String.t(), String.t()} | :undefined | keyword(),
  queue: String.t(),
  queue_options: keyword(),
  routing_key: [String.t()] | String.t(),
  prefetch_count: String.t(),
  uri: String.t(),
  concurrency: boolean(),
  terminate_timeout: integer(),
  handle_message_timeout: integer(),
  queue_ttl: integer(),
  retry_delay_function: function(),
  reconnect: boolean(),
  deadletter: boolean(),
  deadletter_queue: String.t(),
  deadletter_queue_options: keyword(),
  deadletter_routing_key: String.t(),
  queue_max_priority: integer()

Invoked to provide consumer configuration

Return values


connection - RabbitMQ connection options. Accepts same arguments as AMQP-library's

queue - the name of the queue to consume. If it does not exist, it will be created.

exchange - name or {type, name} of the exchange to which queue should be bound. If it does not exist, it will be created. For valid exchange types see GenRMQ.Binding.

routing_key - queue binding key, can also be a list.

prefetch_count - limit the number of unacknowledged messages.


uri - RabbitMQ uri. Deprecated. Please use connection.

queue_options - queue options as declared in AMQP.Queue.declare/3.

If argument 'x-expires' is given to arguments, then it will be used instead of queue_ttl.

If argument 'x-max-priority' is given to arguments, then it will be used instead of queue_max_priority.

queue_ttl - controls for how long a queue can be unused before it is automatically deleted. Unused means the queue has no consumers, the queue has not been redeclared, and basic.get has not been invoked for a duration of at least the expiration period

queue_max_priority - defines if a declared queue should be a priority queue. Should be set to a value from 1..255 range. If it is greater than 255, queue max priority will be set to 255. Values between 1 and 10 are recommended.

concurrency - defines if handle_message callback is called in separate process using supervised task. By default concurrency is enabled. To disable, set it to false

terminate_timeout - defines how long the consumer will wait for in-flight Tasks to complete before terminating the process. The value is in milliseconds and the default is 5_000 milliseconds.

handle_message_timeout - defines how long the handle_message callback will execute within a supervised task. The value is in milliseconds and the default is 5_000 milliseconds.

retry_delay_function - custom retry delay function. Called when the connection to the broker cannot be established. Receives the connection attempt as an argument (>= 1) and is expected to wait for some time. With this callback you can for example do exponential backoff. The default implementation is a linear delay starting with 1 second step.

reconnect - defines if consumer should reconnect on connection termination. By default reconnection is enabled.

deadletter - defines if consumer should setup deadletter exchange and queue. (Default: true).

deadletter_queue - defines name of the deadletter queue (Default: Same as queue name suffixed by _error).

deadletter_queue_options - queue options for the deadletter queue as declared in AMQP.Queue.declare/3.

If argument 'x-expires' is given to arguments, then it will be used instead of queue_ttl.

If argument 'x-max-priority' is given to arguments, then it will be used instead of queue_max_priority.

deadletter_exchange - name or {type, name} of the deadletter exchange (Default: Same as exchange name suffixed by .deadletter). If it does not exist, it will be created. For valid exchange types see GenRMQ.Binding

deadletter_routing_key - defines name of the deadletter routing key (Default: #).


def init() do
    connection: "amqp://guest:guest@localhost:5672",
    queue: "gen_rmq_in_queue",
    queue_options: [
      durable: true,
      passive: true,
      arguments: [
        {"x-queue-type", :longstr ,"quorum"}
    exchange: "gen_rmq_exchange",
    routing_key: "#",
    prefetch_count: "10",
    uri: "amqp://guest:guest@localhost:5672",
    concurrency: true,
    terminate_timeout: 5_000,
    handle_message_timeout: 5_000,
    queue_ttl: 5_000,
    retry_delay_function: fn attempt -> :timer.sleep(1000 * attempt) end,
    reconnect: true,
    deadletter: true,
    deadletter_queue: "gen_rmq_in_queue_error",
    deadletter_queue_options: [
      arguments: [
        {"x-queue-type", :longstr ,"quorum"}
    deadletter_exchange: "gen_rmq_exchange.deadletter",
    deadletter_routing_key: "#",
    queue_max_priority: 10