GenRMQ.Consumer behaviour (gen_rmq v4.0.0) View Source

A behaviour module for implementing the RabbitMQ consumer.

It will:

  • setup RabbitMQ connection / channel and keep them in a state
  • create (if does not exist) a queue and bind it to an exchange
  • create deadletter queue and exchange
  • handle reconnections
  • call handle_message callback on every message delivery
  • call handle_error callback whenever handle_message fails to process or times out

Link to this section Summary

Callbacks

Invoked to provide consumer tag

Invoked when an error or timeout is encountered while executing handle_message callback

Invoked on message delivery

Invoked to provide consumer configuration

Functions

Acknowledges given message

Returns a specification to start this module under a supervisor.

Requeues / rejects given message

Starts GenRMQ.Consumer process with given callback module linked to the current process

Synchronously stops the consumer with a given reason

Link to this section Callbacks

Specs

consumer_tag() :: String.t()

Invoked to provide consumer tag

Examples:

def consumer_tag() do
  "hostname-app-version-consumer"
end
Link to this callback

handle_error(message, reason)

View Source

Specs

handle_error(
  message :: %GenRMQ.Message{
    attributes: term(),
    channel: term(),
    payload: term()
  },
  reason :: atom() | {struct(), list()}
) :: :ok

Invoked when an error or timeout is encountered while executing handle_message callback

message - GenRMQ.Message struct reason - the information regarding the error

Examples:

To reject the message that caused the Task to fail you can do something like so:

def handle_error(message, reason) do
  # Do something with message and reject it
  Logger.warn("Failed to process message: #{inspect(message)}")

  GenRMQ.Consumer.reject(message)
end

The reason argument will either be the atom :killed if the Task timed out and needed to be stopped. Or it will be a 2 element tuple where the first element is the error stuct and the second element is the stacktrace:

{
  %RuntimeError{message: "Can't divide by zero!"},
  [
    {TestConsumer.ErrorWithoutConcurrency, :handle_message, 1, [file: 'test/support/test_consumers.ex', line: 98]},
    {GenRMQ.Consumer, :handle_message, 2, [file: 'lib/consumer.ex', line: 519]},
    {GenRMQ.Consumer, :handle_info, 2, [file: 'lib/consumer.ex', line: 424]},
    {:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]},
    {:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]},
    {:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}
  ]
}

Specs

handle_message(
  message :: %GenRMQ.Message{
    attributes: term(),
    channel: term(),
    payload: term()
  }
) :: :ok

Invoked on message delivery

message - GenRMQ.Message struct

Examples:

def handle_message(message) do
  # Do something with message and acknowledge it
  GenRMQ.Consumer.ack(message)
end

Specs

init() :: [
  connection: keyword() | {String.t(), String.t()} | :undefined | keyword(),
  queue: String.t(),
  queue_options: keyword(),
  exchange: GenRMQ.Binding.exchange(),
  routing_key: [String.t()] | String.t(),
  prefetch_count: String.t(),
  concurrency: boolean(),
  terminate_timeout: integer(),
  handle_message_timeout: integer(),
  retry_delay_function: function(),
  reconnect: boolean(),
  deadletter: boolean(),
  deadletter_queue: String.t(),
  deadletter_queue_options: keyword(),
  deadletter_exchange: GenRMQ.Binding.exchange(),
  deadletter_routing_key: String.t()
]

Invoked to provide consumer configuration

Return values

Mandatory:

connection - RabbitMQ connection options. Accepts same arguments as AMQP-library's Connection.open/2.

queue - the name of the queue to consume. If it does not exist, it will be created.

exchange - name, :default or {type, name} of the target exchange. If it does not exist, it will be created. Supported types: :direct, :fanout, :topic

routing_key - queue binding key, can also be a list.

prefetch_count - limit the number of unacknowledged messages.

Optional:

queue_options - queue options as declared in AMQP.Queue.declare/3.

concurrency - defines if handle_message callback is called in separate process using supervised task. By default concurrency is enabled. To disable, set it to false

terminate_timeout - defines how long the consumer will wait for in-flight Tasks to complete before terminating the process. The value is in milliseconds and the default is 5_000 milliseconds.

handle_message_timeout - defines how long the handle_message callback will execute within a supervised task. The value is in milliseconds and the default is 5_000 milliseconds.

retry_delay_function - custom retry delay function. Called when the connection to the broker cannot be established. Receives the connection attempt as an argument (>= 1) and is expected to wait for some time. With this callback you can for example do exponential backoff. The default implementation is a linear delay starting with 1 second step.

reconnect - defines if consumer should reconnect on connection termination. By default reconnection is enabled.

deadletter - defines if consumer should setup deadletter exchange and queue. (Default: true).

deadletter_queue - defines name of the deadletter queue (Default: Same as queue name suffixed by _error).

deadletter_queue_options - queue options for the deadletter queue as declared in AMQP.Queue.declare/3.

deadletter_exchange - name or {type, name} of the deadletter exchange (Default: Same as exchange name suffixed by .deadletter). If it does not exist, it will be created. For valid exchange types see GenRMQ.Binding

deadletter_routing_key - defines name of the deadletter routing key (Default: #).

Examples:

def init() do
  [
    connection: "amqp://guest:guest@localhost:5672",
    queue: "gen_rmq_in_queue",
    queue_options: [
      durable: true,
      passive: true,
      arguments: [
        {"x-queue-type", :longstr ,"quorum"}
      ]
    ]
    exchange: "gen_rmq_exchange",
    routing_key: "#",
    prefetch_count: "10",
    concurrency: true,
    terminate_timeout: 5_000,
    handle_message_timeout: 5_000,
    retry_delay_function: fn attempt -> :timer.sleep(1000 * attempt) end,
    reconnect: true,
    deadletter: true,
    deadletter_queue: "gen_rmq_in_queue_error",
    deadletter_queue_options: [
      arguments: [
        {"x-queue-type", :longstr ,"quorum"}
      ]
    ]
    deadletter_exchange: "gen_rmq_exchange.deadletter",
    deadletter_routing_key: "#"
  ]
end

Link to this section Functions

Specs

ack(
  message :: %GenRMQ.Message{
    attributes: term(),
    channel: term(),
    payload: term()
  }
) :: :ok

Acknowledges given message

message - GenRMQ.Message struct

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

reject(message, requeue \\ false)

View Source

Specs

reject(
  message :: %GenRMQ.Message{
    attributes: term(),
    channel: term(),
    payload: term()
  },
  requeue :: boolean()
) :: :ok

Requeues / rejects given message

message - GenRMQ.Message struct

requeue - indicates if message should be requeued

Link to this function

start_link(module, options \\ [])

View Source

Specs

start_link(module :: module(), options :: Keyword.t()) ::
  {:ok, pid()} | {:error, term()}

Starts GenRMQ.Consumer process with given callback module linked to the current process

module - callback module implementing GenRMQ.Consumer behaviour

Options

  • :name - used for name registration

Return values

If the consumer is successfully created and initialized, this function returns {:ok, pid}, where pid is the PID of the consumer. If a process with the specified consumer name already exists, this function returns {:error, {:already_started, pid}} with the PID of that process.

Examples:

GenRMQ.Consumer.start_link(Consumer, name: :consumer)

Specs

stop(name :: atom() | pid(), reason :: term()) :: :ok

Synchronously stops the consumer with a given reason

name - pid or name of the consumer to stop reason - reason of the termination

Examples:

GenRMQ.Consumer.stop(:consumer, :normal)