DSL for defining topic consumers with concurrency control, retry logic, and middleware.
Example
defmodule MyApp.Payments.CreatedConsumer do
use PhoenixMicro.Consumer
topic "payments.created"
concurrency 10
retry max_attempts: 5, base_delay: 500, max_delay: 30_000
middleware [
PhoenixMicro.Middleware.Logger,
PhoenixMicro.Middleware.Metrics
]
@impl PhoenixMicro.Consumer
def handle(%PhoenixMicro.Message{} = message, _context) do
%{"amount" => amount, "currency" => currency} = message.payload
# ... process ...
:ok
end
# Optional: override per-message error handling
@impl PhoenixMicro.Consumer
def handle_error(message, error, _context) do
Logger.error("Payment processing failed", error: inspect(error))
{:retry, message}
end
endBehaviour callbacks
Consumers must implement handle/2. handle_error/3 is optional.
The return value of handle/2 controls the ack/nack flow:
:ok— Ack the message.{:ok, _result}— Ack.{:error, reason}— Nack; triggers retry if attempts remain, DLQ otherwise.{:retry, message}— Explicit retry request.:nack— Nack without retry.
Summary
Functions
Manually acknowledges a message from within a handler.
Number of messages Broadway will accumulate before calling handle_batch/4.
Set to 1 (default) to disable batching and process messages individually.
Maximum time in milliseconds to wait before flushing an incomplete batch.
Only relevant when batch_size > 1. Default: 1000ms.
Returns the consumer configuration map for the given module.
Invokes the consumer's handle/2 through its middleware chain.
Manually negatively-acknowledges a message from within a handler.
Processing mode. :broadway (default) uses a full Broadway pipeline with
backpressure and demand-driven scheduling. :simple uses the legacy
Task.start dispatch — useful when Broadway is not a dependency.
Starts a supervised consumer worker for the given consumer module.
Called by PhoenixMicro.Supervisor.ConsumerManager.
Types
@type context() :: %{ transport: atom(), topic: String.t(), attempt: pos_integer(), transport_mod: module(), message: PhoenixMicro.Message.t() }
Callbacks
@callback handle(PhoenixMicro.Message.t(), context()) :: :ok | {:ok, term()} | {:error, term()} | :nack
@callback handle_error(PhoenixMicro.Message.t(), term(), context()) :: {:retry, PhoenixMicro.Message.t()} | :nack | :ok
Functions
@spec ack(PhoenixMicro.Message.t(), context()) :: :ok
Manually acknowledges a message from within a handler.
Use this when you need to ack before the handler returns — for example, when kicking off an async process and you want to release the broker slot immediately.
def handle(message, context) do
PhoenixMicro.Consumer.ack(message, context)
Task.start(fn -> do_slow_work(message.payload) end)
:ok
end
Number of messages Broadway will accumulate before calling handle_batch/4.
Set to 1 (default) to disable batching and process messages individually.
Maximum time in milliseconds to wait before flushing an incomplete batch.
Only relevant when batch_size > 1. Default: 1000ms.
Returns the consumer configuration map for the given module.
@spec dispatch(module(), PhoenixMicro.Message.t(), context()) :: :ok | {:error, term()}
Invokes the consumer's handle/2 through its middleware chain.
@spec nack(PhoenixMicro.Message.t(), context(), term()) :: :ok
Manually negatively-acknowledges a message from within a handler.
Use when you detect a permanent failure and want to route to DLQ immediately without going through the retry count.
def handle(message, context) do
if invalid_payload?(message.payload) do
PhoenixMicro.Consumer.nack(message, context, :invalid_payload)
:nack
else
process(message.payload)
:ok
end
end
Processing mode. :broadway (default) uses a full Broadway pipeline with
backpressure and demand-driven scheduling. :simple uses the legacy
Task.start dispatch — useful when Broadway is not a dependency.
@spec start_link( module(), keyword() ) :: GenServer.on_start()
Starts a supervised consumer worker for the given consumer module.
Called by PhoenixMicro.Supervisor.ConsumerManager.