Middleware wraps message handling in composable, ordered layers — outermost first. Every middleware receives the PhoenixMicro.Message struct and a next function representing the rest of the chain.

Composing middleware

defmodule MyApp.Payments.CreatedConsumer do
  use PhoenixMicro.Consumer

  topic "payments.created"

  # Middleware runs left-to-right (Logger outermost, Idempotency innermost)
  middleware [
    PhoenixMicro.Middleware.Logger,
    PhoenixMicro.Middleware.Metrics,
    PhoenixMicro.Middleware.Retry,
    {PhoenixMicro.Middleware.CircuitBreaker,
     threshold: 5, window_ms: 60_000, reset_timeout_ms: 30_000},
    {PhoenixMicro.Middleware.Idempotency,
     store: PhoenixMicro.Middleware.Idempotency.ETSStore}
  ]
end

Built-in middleware

Logger

Emits a structured debug log on message receipt and a debug or warning log on outcome.

middleware [PhoenixMicro.Middleware.Logger]

Metrics

Emits :telemetry.span events for every message. Use with PhoenixMicro.Telemetry.metrics/0 to wire into TelemetryMetricsPrometheus or similar.

middleware [PhoenixMicro.Middleware.Metrics]

Retry

Middleware-level retry with exponential backoff and jitter — runs before the consumer-level retry configured via retry max_attempts: N.

middleware [
  {PhoenixMicro.Middleware.Retry,
   max: 3,
   base_delay: 200,
   max_delay: 5_000}
]

CircuitBreaker

ETS-backed 3-state fuse per consumer topic.

StateBehaviour
:closedNormal — all messages processed
:openRejecting — returns {:error, :circuit_open} immediately
:half_openProbing — allows one message through; closes on success, reopens on failure
middleware [
  {PhoenixMicro.Middleware.CircuitBreaker,
   threshold:        10,       # failures within window before opening
   window_ms:        60_000,   # sliding failure window
   reset_timeout_ms: 30_000}   # how long to stay open before half-open probe
]

Emits telemetry on trip and reset:

  • [:phoenix_micro, :circuit_breaker, :tripped]
  • [:phoenix_micro, :circuit_breaker, :reset]
  • [:phoenix_micro, :circuit_breaker, :rejected]

Idempotency

Deduplicates messages by message.id. Pluggable store — use ETS in dev/test, Redis or PostgreSQL in production.

# Dev / test
middleware [
  {PhoenixMicro.Middleware.Idempotency,
   store: PhoenixMicro.Middleware.Idempotency.ETSStore}
]

# Production — custom Redis store
middleware [
  {PhoenixMicro.Middleware.Idempotency,
   store: MyApp.Middleware.RedisIdempotencyStore}
]

Custom store (implement the PhoenixMicro.IdempotencyStore behaviour):

defmodule MyApp.Middleware.RedisIdempotencyStore do
  @behaviour PhoenixMicro.IdempotencyStore

  @impl PhoenixMicro.IdempotencyStore
  def seen?(id) do
    case MyApp.Redis.get("idem:#{id}") do
      {:ok, nil} -> false
      {:ok, _}   -> true
      _          -> false
    end
  end

  @impl PhoenixMicro.IdempotencyStore
  def mark_seen(id) do
    MyApp.Redis.set("idem:#{id}", "1", ex: 86_400)
    :ok
  end
end

Tracing

OpenTelemetry-compatible. Extracts W3C traceparent headers and creates spans. Degrades gracefully if :opentelemetry is not installed.

middleware [PhoenixMicro.Middleware.Tracing]

Requires in your app's deps (optional):

{:opentelemetry, "~> 1.3"},
{:opentelemetry_api, "~> 1.3"}

Writing custom middleware

Implement the PhoenixMicro.Middleware behaviour with a single call/2 callback:

defmodule MyApp.Middleware.TenantContext do
  @behaviour PhoenixMicro.Middleware

  require Logger

  @impl PhoenixMicro.Middleware
  def call(%PhoenixMicro.Message{} = message, next) do
    tenant_id = Map.get(message.headers, "x-tenant-id", "default")

    # Set for the duration of this message
    Process.put(:current_tenant_id, tenant_id)
    Logger.metadata(tenant_id: tenant_id)

    result = next.(message)

    # Always clean up — even on error
    Process.delete(:current_tenant_id)
    result
  end
end
defmodule MyApp.Middleware.RateLimiter do
  @behaviour PhoenixMicro.Middleware

  @impl PhoenixMicro.Middleware
  def call(%PhoenixMicro.Message{} = message, next) do
    topic = message.topic

    case MyApp.RateLimiter.check(topic) do
      :ok ->
        next.(message)

      {:error, :rate_exceeded} ->
        # Returning an error causes the consumer to retry / DLQ
        {:error, :rate_exceeded}
    end
  end
end

Middleware composition order

consumer: middleware [A, B, C]

A.call(message, fn ->
  B.call(message, fn ->
    C.call(message, fn ->
      handler.(message)   # your handle/2
    end)
  end)
end)

Errors propagate back outward through each layer, so Logger and Metrics always see the final outcome — regardless of which middleware or handler caused the failure.

Accessing middleware state

The full middleware chain result is available in handle_error/3:

@impl PhoenixMicro.Consumer
def handle_error(message, reason, _ctx) do
  Logger.error("Giving up on #{message.id}: #{inspect(reason)}")
  MyApp.Alerts.notify(:consumer_failure, message, reason)
  :ok
end