gen_rmq v2.3.0 GenRMQ.Consumer behaviour View Source

A behaviour module for implementing the RabbitMQ consumer.

It will:

  • setup RabbitMQ connection / channel and keep them in a state
  • create (if does not exist) a queue and bind it to an exchange
  • create deadletter queue and exchange
  • handle reconnections
  • call handle_message callback on every message delivery

Link to this section Summary

Functions

Acknowledges given message

Returns a specification to start this module under a supervisor.

Requeues / rejects given message

Starts GenRMQ.Consumer process with given callback module linked to the current process

Synchronously stops the consumer with a given reason

Callbacks

Invoked to provide consumer tag

Invoked on message delivery

Invoked to provide consumer configuration

Link to this section Functions

Link to this function

ack(message)

View Source
ack(
  message :: %GenRMQ.Message{attributes: term(), payload: term(), state: term()}
) :: :ok

Acknowledges given message

message - GenRMQ.Message struct

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

reject(message, requeue \\ false)

View Source
reject(
  message :: %GenRMQ.Message{attributes: term(), payload: term(), state: term()},
  requeue :: boolean()
) :: :ok

Requeues / rejects given message

message - GenRMQ.Message struct

requeue - indicates if message should be requeued

Link to this function

start_link(module, options \\ [])

View Source
start_link(module :: module(), options :: Keyword.t()) ::
  {:ok, pid()} | {:error, term()}

Starts GenRMQ.Consumer process with given callback module linked to the current process

module - callback module implementing GenRMQ.Consumer behaviour

Options

  • :name - used for name registration

Return values

If the consumer is successfully created and initialized, this function returns {:ok, pid}, where pid is the PID of the consumer. If a process with the specified consumer name already exists, this function returns {:error, {:already_started, pid}} with the PID of that process.

Examples:

GenRMQ.Consumer.start_link(Consumer, name: :consumer)
Link to this function

stop(name, reason)

View Source
stop(name :: atom() | pid(), reason :: term()) :: :ok

Synchronously stops the consumer with a given reason

name - pid or name of the consumer to stop reason - reason of the termination

Examples:

GenRMQ.Consumer.stop(:consumer, :normal)

Link to this section Callbacks

Link to this callback

consumer_tag()

View Source
consumer_tag() :: String.t()

Invoked to provide consumer tag

Examples:

def consumer_tag() do
  "hostname-app-version-consumer"
end
Link to this callback

handle_message(message)

View Source
handle_message(
  message :: %GenRMQ.Message{attributes: term(), payload: term(), state: term()}
) :: :ok

Invoked on message delivery

message - GenRMQ.Message struct

Examples:

def handle_message(message) do
  # Do something with message and acknowledge it
  GenRMQ.Consumer.ack(message)
end
Link to this callback

init()

View Source
init() :: [
  queue: String.t(),
  exchange: GenRMQ.Binding.exchange(),
  routing_key: [String.t()] | String.t(),
  prefetch_count: String.t(),
  uri: String.t(),
  concurrency: boolean(),
  queue_ttl: integer(),
  retry_delay_function: function(),
  reconnect: boolean(),
  deadletter: boolean(),
  deadletter_queue: String.t(),
  deadletter_exchange: String.t(),
  deadletter_routing_key: String.t(),
  queue_max_priority: integer()
]

Invoked to provide consumer configuration

Return values

Mandatory:

uri - RabbitMQ uri

queue - the name of the queue to consume. If it does not exist, it will be created.

exchange - Name or {type, name} of the exchange to which queue should be bound. If it does not exist, it will be created. For valid exchange types see GenRMQ.Binding.

routing_key - queue binding key, can also be a list.

prefetch_count - limit the number of unacknowledged messages.

Optional:

queue_ttl - controls for how long a queue can be unused before it is automatically deleted. Unused means the queue has no consumers, the queue has not been redeclared, and basic.get has not been invoked for a duration of at least the expiration period

queue_max_priority - defines if a declared queue should be a priority queue. Should be set to a value from 1..255 range. If it is greater than 255, queue max priority will be set to 255. Values between 1 and 10 are recommened.

concurrency - defines if handle_message callback is called in seperate process using spawn function. By default concurrency is enabled. To disable, set it to false

retry_delay_function - custom retry delay function. Called when the connection to the broker cannot be established. Receives the connection attempt as an argument (>= 1) and is expected to wait for some time. With this callback you can for example do exponential backoff. The default implementation is a linear delay starting with 1 second step.

reconnect - defines if consumer should reconnect on connection termination. By default reconnection is enabled.

deadletter - defines if consumer should setup deadletter exchange and queue.

deadletter_queue - defines name of the deadletter queue (Default: Same as queue name suffixed by _error).

deadletter_exchange - defines name of the deadletter exchange (Default: Same as exchange name suffixed by .deadletter).

deadletter_routing_key - defines name of the deadletter routing key (Default: #).

Examples:

def init() do
  [
    queue: "gen_rmq_in_queue",
    exchange: "gen_rmq_exchange",
    routing_key: "#",
    prefetch_count: "10",
    uri: "amqp://guest:guest@localhost:5672",
    concurrency: true,
    queue_ttl: 5000,
    retry_delay_function: fn attempt -> :timer.sleep(1000 * attempt) end,
    reconnect: true,
    deadletter: true,
    deadletter_queue: "gen_rmq_in_queue_error",
    deadletter_exchange: "gen_rmq_exchange.deadletter",
    deadletter_routing_key: "#",
    queue_max_priority: 10
  ]
end