gen_rmq v2.3.0 GenRMQ.Consumer behaviour View Source
A behaviour module for implementing the RabbitMQ consumer.
It will:
- setup RabbitMQ connection / channel and keep them in a state
- create (if does not exist) a queue and bind it to an exchange
- create deadletter queue and exchange
- handle reconnections
- call
handle_messagecallback on every message delivery
Link to this section Summary
Functions
Acknowledges given message
Returns a specification to start this module under a supervisor.
Requeues / rejects given message
Starts GenRMQ.Consumer process with given callback module linked to the current
process
Synchronously stops the consumer with a given reason
Link to this section Functions
Acknowledges given message
message - GenRMQ.Message struct
Returns a specification to start this module under a supervisor.
See Supervisor.
Requeues / rejects given message
message - GenRMQ.Message struct
requeue - indicates if message should be requeued
Starts GenRMQ.Consumer process with given callback module linked to the current
process
module - callback module implementing GenRMQ.Consumer behaviour
Options
:name- used for name registration
Return values
If the consumer is successfully created and initialized, this function returns
{:ok, pid}, where pid is the PID of the consumer. If a process with the
specified consumer name already exists, this function returns
{:error, {:already_started, pid}} with the PID of that process.
Examples:
GenRMQ.Consumer.start_link(Consumer, name: :consumer)
Synchronously stops the consumer with a given reason
name - pid or name of the consumer to stop
reason - reason of the termination
Examples:
GenRMQ.Consumer.stop(:consumer, :normal)
Link to this section Callbacks
Invoked to provide consumer tag
Examples:
def consumer_tag() do
"hostname-app-version-consumer"
end
Invoked on message delivery
message - GenRMQ.Message struct
Examples:
def handle_message(message) do
# Do something with message and acknowledge it
GenRMQ.Consumer.ack(message)
end
init()
View Sourceinit() :: [ queue: String.t(), exchange: GenRMQ.Binding.exchange(), routing_key: [String.t()] | String.t(), prefetch_count: String.t(), uri: String.t(), concurrency: boolean(), queue_ttl: integer(), retry_delay_function: function(), reconnect: boolean(), deadletter: boolean(), deadletter_queue: String.t(), deadletter_exchange: String.t(), deadletter_routing_key: String.t(), queue_max_priority: integer() ]
Invoked to provide consumer configuration
Return values
Mandatory:
uri - RabbitMQ uri
queue - the name of the queue to consume. If it does not exist, it will be created.
exchange - Name or {type, name} of the exchange to which queue should be bound. If it does not exist, it will be created.
For valid exchange types see GenRMQ.Binding.
routing_key - queue binding key, can also be a list.
prefetch_count - limit the number of unacknowledged messages.
Optional:
queue_ttl - controls for how long a queue can be unused before it is
automatically deleted. Unused means the queue has no consumers,
the queue has not been redeclared, and basic.get has not been invoked
for a duration of at least the expiration period
queue_max_priority - defines if a declared queue should be a priority queue.
Should be set to a value from 1..255 range. If it is greater than 255, queue
max priority will be set to 255. Values between 1 and 10 are
recommened.
concurrency - defines if handle_message callback is called
in seperate process using spawn
function. By default concurrency is enabled. To disable, set it to false
retry_delay_function - custom retry delay function. Called when the connection to
the broker cannot be established. Receives the connection attempt as an argument (>= 1)
and is expected to wait for some time.
With this callback you can for example do exponential backoff.
The default implementation is a linear delay starting with 1 second step.
reconnect - defines if consumer should reconnect on connection termination.
By default reconnection is enabled.
deadletter - defines if consumer should setup deadletter exchange and queue.
deadletter_queue - defines name of the deadletter queue (Default: Same as queue name suffixed by _error).
deadletter_exchange - defines name of the deadletter exchange (Default: Same as exchange name suffixed by .deadletter).
deadletter_routing_key - defines name of the deadletter routing key (Default: #).
Examples:
def init() do
[
queue: "gen_rmq_in_queue",
exchange: "gen_rmq_exchange",
routing_key: "#",
prefetch_count: "10",
uri: "amqp://guest:guest@localhost:5672",
concurrency: true,
queue_ttl: 5000,
retry_delay_function: fn attempt -> :timer.sleep(1000 * attempt) end,
reconnect: true,
deadletter: true,
deadletter_queue: "gen_rmq_in_queue_error",
deadletter_exchange: "gen_rmq_exchange.deadletter",
deadletter_routing_key: "#",
queue_max_priority: 10
]
end