Lepus.Consumer behaviour (Lepus v0.1.6)
View SourceConsumer receives messages from a RabbitMQ queue.
It uses Broadway and BroadwayRabbitMQ.Producer under the hood.
Features
- Optionally retries failed messages with exponential backoff
- Optionally publishes failed messages to separate queue
- Supports RPC
Topology
┌► [queue]
│ │
│ ▼
│ (Lepus.Consumer) ───► {failed exchange} ───► [failed queue]
│ │
│ ▼
│ {delay exchange}
│ │
│ ▼
│ [retry queue]
│ │
│ ▼
└ {retry exchange}Example
Define a consumer:
defmodule MyApp.MyConsumer do
@behaviour Lepus.Consumer
@impl Lepus.Consumer
def options do
[
connection: [
host: "localhost",
port: 5672,
virtual_host: "/",
username: "guest",
password: "guest"
],
exchange: "my_exchange",
routing_key: "my_routing_key",
queue: "my_queue",
store_failed: true,
max_retry_count: 5
]
end
@impl Lepus.Consumer
def handle_message(data, metadata) do
Do.something_with(data, metadata)
end
endThen add it to your supervision tree (usually in lib/my_app/application.ex):
children = [
{Lepus.Consumer, consumers: [MyApp.MyConsumer]}
]
children |> Supervisor.start_link(strategy: :one_for_one)You can also define some global options for all the consumers in the supervision tree:
children = [
{Lepus.Consumer,
consumers: [MyApp.MyConsumer, MyApp.OtherConsumer],
options: [connection: rabbit_connection, exchange: "my_exchange"]}
]
children |> Supervisor.start_link(strategy: :one_for_one)
Summary
Callbacks
Receives message payload and metadata.
Should return keyword list of options (if defined). You can also define options in the supervision tree as described above.
Functions
Returns a specification to start this module under a supervisor.
Types
Callbacks
@callback handle_message(payload(), metadata()) :: :ok | {:ok, payload()} | :retry | {:retry, payload()} | :error | {:error, payload()}
Receives message payload and metadata.
payload
Usually it's a binary RabbitMQ message payload.
But it could be parsed JSON (map, list, etc.)
if content type of the message is "application/json" or it was sent via Lepus.publish_json/4
metadata
It's a map with the keys:
rpc- Defines if the message was sent using RPC pattern (reply_toandcorrelation_idis not empty). The function should return{:ok, result}ifrpcistrue.retry_count- Count of retries if the message was retried. Otherwise0.retriable– Defines if the message will be retried in case of error.client–"lepus"if the message was sent viaLepus.publish/4orLepus.publish_json/4.rabbit_mq_metadata-metadatafield fromBroadwayRabbitMQ.Producer.
Should return in case of using RPC:
{:ok, response}– soLepus.publish/4orLepus.publish_json/4returns{:ok, response}{:error, response}– soLepus.publish/4orLepus.publish_json/4returns{:error, response}
Should return in case of not using RPC:
:ok– exists successfully.:error– exits with error. The message could be retried ifmax_retry_count > 0.{:error, error_message}– The same as:errorbut adds theerror_messageto RabbitMQ headers.:retry– always retries message (even ifmax_retry_count == 0).{:retry, reason}– The same as:retrybut adds thereasonto RabbitMQ headers.
@callback options() :: Keyword.t()
Should return keyword list of options (if defined). You can also define options in the supervision tree as described above.
Options
:connection- Required. Defines an AMQP URI or a set of options used by the RabbitMQ client to open the connection with the RabbitMQ broker. SeeAMQP.Connection.open/1for the full list of options.:exchange(String.t/0) - Required. The name of the RabbitMQ exchange.:routing_key(String.t/0) - Required. The name of the RabbitMQ routing_key.:delay_exchange(String.t/0) - Redefines delay exchange name. The default value is"#{exchange}.delay":retry_exchange(String.t/0) - Redefines retry exchange name. The default value is"#{exchange}.retry":failed_exchange(String.t/0) - Redefines error exchange name. The default value is"#{exchange}.failed":queue(String.t/0) - Redefines queue name. The default value is"#{exchange}.#{routing_key}":retry_queue(String.t/0) - Redefines retry queue name. The default value is"#{queue}.retry":failed_queue(String.t/0) - Redefines error queue name. The default value is"#{queue}.failed":processor(keyword/0) - Broadway processor options. Only one processor is used. SeeBroadway.start_link/2for the full list of options. The default value is[].:max_retry_count(non_neg_integer/0) - The maximum count of message retries. The default value is0.:store_failed(boolean/0) - Defines if the failed message should be published tofailed_queue. The default value isfalse.:queues_type- Defines queues type (one of["classic", "quorum"]). Used during queue declaration. The default value is"classic".
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec start_link(keyword()) :: Supervisor.on_start()