GenRMQ.Consumer behaviour (gen_rmq v4.0.0) View Source
A behaviour module for implementing the RabbitMQ consumer.
It will:
- setup RabbitMQ connection / channel and keep them in a state
- create (if does not exist) a queue and bind it to an exchange
- create deadletter queue and exchange
- handle reconnections
- call
handle_message
callback on every message delivery - call
handle_error
callback wheneverhandle_message
fails to process or times out
Link to this section Summary
Callbacks
Invoked to provide consumer tag
Invoked when an error or timeout is encountered while executing handle_message
callback
Invoked on message delivery
Invoked to provide consumer configuration
Functions
Acknowledges given message
Returns a specification to start this module under a supervisor.
Requeues / rejects given message
Starts GenRMQ.Consumer
process with given callback module linked to the current
process
Synchronously stops the consumer with a given reason
Link to this section Callbacks
Specs
consumer_tag() :: String.t()
Invoked to provide consumer tag
Examples:
def consumer_tag() do
"hostname-app-version-consumer"
end
Specs
handle_error( message :: %GenRMQ.Message{ attributes: term(), channel: term(), payload: term() }, reason :: atom() | {struct(), list()} ) :: :ok
Invoked when an error or timeout is encountered while executing handle_message
callback
message
- GenRMQ.Message
struct
reason
- the information regarding the error
Examples:
To reject the message that caused the Task to fail you can do something like so:
def handle_error(message, reason) do
# Do something with message and reject it
Logger.warn("Failed to process message: #{inspect(message)}")
GenRMQ.Consumer.reject(message)
end
The reason
argument will either be the atom :killed
if the Task timed out and needed
to be stopped. Or it will be a 2 element tuple where the first element is the error stuct
and the second element is the stacktrace:
{
%RuntimeError{message: "Can't divide by zero!"},
[
{TestConsumer.ErrorWithoutConcurrency, :handle_message, 1, [file: 'test/support/test_consumers.ex', line: 98]},
{GenRMQ.Consumer, :handle_message, 2, [file: 'lib/consumer.ex', line: 519]},
{GenRMQ.Consumer, :handle_info, 2, [file: 'lib/consumer.ex', line: 424]},
{:gen_server, :try_dispatch, 4, [file: 'gen_server.erl', line: 637]},
{:gen_server, :handle_msg, 6, [file: 'gen_server.erl', line: 711]},
{:proc_lib, :init_p_do_apply, 3, [file: 'proc_lib.erl', line: 249]}
]
}
Specs
handle_message( message :: %GenRMQ.Message{ attributes: term(), channel: term(), payload: term() } ) :: :ok
Invoked on message delivery
message
- GenRMQ.Message
struct
Examples:
def handle_message(message) do
# Do something with message and acknowledge it
GenRMQ.Consumer.ack(message)
end
Specs
init() :: [ connection: keyword() | {String.t(), String.t()} | :undefined | keyword(), queue: String.t(), queue_options: keyword(), exchange: GenRMQ.Binding.exchange(), routing_key: [String.t()] | String.t(), prefetch_count: String.t(), concurrency: boolean(), terminate_timeout: integer(), handle_message_timeout: integer(), retry_delay_function: function(), reconnect: boolean(), deadletter: boolean(), deadletter_queue: String.t(), deadletter_queue_options: keyword(), deadletter_exchange: GenRMQ.Binding.exchange(), deadletter_routing_key: String.t() ]
Invoked to provide consumer configuration
Return values
Mandatory:
connection
- RabbitMQ connection options. Accepts same arguments as AMQP-library's Connection.open/2.
queue
- the name of the queue to consume. If it does not exist, it will be created.
exchange
- name, :default
or {type, name}
of the target exchange.
If it does not exist, it will be created. Supported types: :direct
, :fanout
, :topic
routing_key
- queue binding key, can also be a list.
prefetch_count
- limit the number of unacknowledged messages.
Optional:
queue_options
- queue options as declared in
AMQP.Queue.declare/3.
concurrency
- defines if handle_message
callback is called
in separate process using supervised task.
By default concurrency is enabled. To disable, set it to false
terminate_timeout
- defines how long the consumer will wait for in-flight Tasks to
complete before terminating the process. The value is in milliseconds and the default
is 5_000 milliseconds.
handle_message_timeout
- defines how long the handle_message
callback will execute
within a supervised task. The value is in milliseconds and the default is 5_000
milliseconds.
retry_delay_function
- custom retry delay function. Called when the connection to
the broker cannot be established. Receives the connection attempt as an argument (>= 1)
and is expected to wait for some time.
With this callback you can for example do exponential backoff.
The default implementation is a linear delay starting with 1 second step.
reconnect
- defines if consumer should reconnect on connection termination.
By default reconnection is enabled.
deadletter
- defines if consumer should setup deadletter exchange and queue.
(Default: true
).
deadletter_queue
- defines name of the deadletter queue (Default: Same as queue name suffixed by _error
).
deadletter_queue_options
- queue options for the deadletter queue as declared in AMQP.Queue.declare/3.
deadletter_exchange
- name or {type, name}
of the deadletter exchange (Default: Same as exchange name suffixed by .deadletter
).
If it does not exist, it will be created. For valid exchange types see GenRMQ.Binding
deadletter_routing_key
- defines name of the deadletter routing key (Default: #
).
Examples:
def init() do
[
connection: "amqp://guest:guest@localhost:5672",
queue: "gen_rmq_in_queue",
queue_options: [
durable: true,
passive: true,
arguments: [
{"x-queue-type", :longstr ,"quorum"}
]
]
exchange: "gen_rmq_exchange",
routing_key: "#",
prefetch_count: "10",
concurrency: true,
terminate_timeout: 5_000,
handle_message_timeout: 5_000,
retry_delay_function: fn attempt -> :timer.sleep(1000 * attempt) end,
reconnect: true,
deadletter: true,
deadletter_queue: "gen_rmq_in_queue_error",
deadletter_queue_options: [
arguments: [
{"x-queue-type", :longstr ,"quorum"}
]
]
deadletter_exchange: "gen_rmq_exchange.deadletter",
deadletter_routing_key: "#"
]
end
Link to this section Functions
Specs
Acknowledges given message
message
- GenRMQ.Message
struct
Returns a specification to start this module under a supervisor.
See Supervisor
.
Specs
reject( message :: %GenRMQ.Message{ attributes: term(), channel: term(), payload: term() }, requeue :: boolean() ) :: :ok
Requeues / rejects given message
message
- GenRMQ.Message
struct
requeue
- indicates if message should be requeued
Specs
Starts GenRMQ.Consumer
process with given callback module linked to the current
process
module
- callback module implementing GenRMQ.Consumer
behaviour
Options
:name
- used for name registration
Return values
If the consumer is successfully created and initialized, this function returns
{:ok, pid}
, where pid
is the PID of the consumer. If a process with the
specified consumer name already exists, this function returns
{:error, {:already_started, pid}}
with the PID of that process.
Examples:
GenRMQ.Consumer.start_link(Consumer, name: :consumer)
Specs
Synchronously stops the consumer with a given reason
name
- pid or name of the consumer to stop
reason
- reason of the termination
Examples:
GenRMQ.Consumer.stop(:consumer, :normal)