PhoenixMicro.Consumer behaviour (PhoenixMicro v1.0.0)

Copy Markdown View Source

DSL for defining topic consumers with concurrency control, retry logic, and middleware.

Example

defmodule MyApp.Payments.CreatedConsumer do
  use PhoenixMicro.Consumer

  topic "payments.created"
  concurrency 10
  retry max_attempts: 5, base_delay: 500, max_delay: 30_000

  middleware [
    PhoenixMicro.Middleware.Logger,
    PhoenixMicro.Middleware.Metrics
  ]

  @impl PhoenixMicro.Consumer
  def handle(%PhoenixMicro.Message{} = message, _context) do
    %{"amount" => amount, "currency" => currency} = message.payload
    # ... process ...
    :ok
  end

  # Optional: override per-message error handling
  @impl PhoenixMicro.Consumer
  def handle_error(message, error, _context) do
    Logger.error("Payment processing failed", error: inspect(error))
    {:retry, message}
  end
end

Behaviour callbacks

Consumers must implement handle/2. handle_error/3 is optional.

The return value of handle/2 controls the ack/nack flow:

  • :ok — Ack the message.
  • {:ok, _result} — Ack.
  • {:error, reason} — Nack; triggers retry if attempts remain, DLQ otherwise.
  • {:retry, message}— Explicit retry request.
  • :nack — Nack without retry.

Summary

Functions

Manually acknowledges a message from within a handler.

Number of messages Broadway will accumulate before calling handle_batch/4. Set to 1 (default) to disable batching and process messages individually.

Maximum time in milliseconds to wait before flushing an incomplete batch. Only relevant when batch_size > 1. Default: 1000ms.

Returns the consumer configuration map for the given module.

Invokes the consumer's handle/2 through its middleware chain.

Manually negatively-acknowledges a message from within a handler.

Processing mode. :broadway (default) uses a full Broadway pipeline with backpressure and demand-driven scheduling. :simple uses the legacy Task.start dispatch — useful when Broadway is not a dependency.

Starts a supervised consumer worker for the given consumer module. Called by PhoenixMicro.Supervisor.ConsumerManager.

Types

context()

@type context() :: %{
  transport: atom(),
  topic: String.t(),
  attempt: pos_integer(),
  transport_mod: module(),
  message: PhoenixMicro.Message.t()
}

Callbacks

handle(t, context)

@callback handle(PhoenixMicro.Message.t(), context()) ::
  :ok | {:ok, term()} | {:error, term()} | :nack

handle_error(t, term, context)

(optional)
@callback handle_error(PhoenixMicro.Message.t(), term(), context()) ::
  {:retry, PhoenixMicro.Message.t()} | :nack | :ok

Functions

ack(message, arg2)

@spec ack(PhoenixMicro.Message.t(), context()) :: :ok

Manually acknowledges a message from within a handler.

Use this when you need to ack before the handler returns — for example, when kicking off an async process and you want to release the broker slot immediately.

def handle(message, context) do
  PhoenixMicro.Consumer.ack(message, context)
  Task.start(fn -> do_slow_work(message.payload) end)
  :ok
end

batch_size(n)

(macro)

Number of messages Broadway will accumulate before calling handle_batch/4. Set to 1 (default) to disable batching and process messages individually.

batch_timeout(ms)

(macro)

Maximum time in milliseconds to wait before flushing an incomplete batch. Only relevant when batch_size > 1. Default: 1000ms.

concurrency(n)

(macro)

config(module)

@spec config(module()) :: map()

Returns the consumer configuration map for the given module.

dead_letter_topic(name)

(macro)

dispatch(consumer_module, message, context)

@spec dispatch(module(), PhoenixMicro.Message.t(), context()) ::
  :ok | {:error, term()}

Invokes the consumer's handle/2 through its middleware chain.

middleware(mods)

(macro)

nack(message, context, reason \\ :nacked)

@spec nack(PhoenixMicro.Message.t(), context(), term()) :: :ok

Manually negatively-acknowledges a message from within a handler.

Use when you detect a permanent failure and want to route to DLQ immediately without going through the retry count.

def handle(message, context) do
  if invalid_payload?(message.payload) do
    PhoenixMicro.Consumer.nack(message, context, :invalid_payload)
    :nack
  else
    process(message.payload)
    :ok
  end
end

pipeline(mode)

(macro)

Processing mode. :broadway (default) uses a full Broadway pipeline with backpressure and demand-driven scheduling. :simple uses the legacy Task.start dispatch — useful when Broadway is not a dependency.

queue_group(g)

(macro)

retry(opts)

(macro)

start_link(consumer_module, opts \\ [])

@spec start_link(
  module(),
  keyword()
) :: GenServer.on_start()

Starts a supervised consumer worker for the given consumer module. Called by PhoenixMicro.Supervisor.ConsumerManager.

topic(name)

(macro)

transport(t)

(macro)