PhoenixMicro.Outbox (PhoenixMicro v1.0.0)

Copy Markdown View Source

Transactional outbox pattern for guaranteed message delivery.

The problem this solves

A naive Repo.insert(order) + PhoenixMicro.publish(event) has a race:

1. Repo.insert(order)    succeeds
2. publish(event)        crashes / network error
    event is LOST

The outbox pattern eliminates the race by writing the event inside the same database transaction as the business record, then relaying it to the broker in a separate background process:

Transaction:
  1. Repo.insert(order)
  2. Outbox.enqueue(event)    writes to outbox_messages table
  COMMIT  both succeed or both roll back

Relay (background):
  3. Poll outbox_messages WHERE relayed_at IS NULL
  4. publish(event) to broker
  5. UPDATE outbox_messages SET relayed_at = now()

Setup

1. Generate and run the migration

mix phx.gen.migration create_outbox_messages

Add to the migration:

def change do
  create table(:outbox_messages, primary_key: false) do
    add :id,           :uuid,    primary_key: true, default: fragment("gen_random_uuid()")
    add :topic,        :string,  null: false
    add :payload,      :map,     null: false
    add :headers,      :map,     default: %{}
    add :attempt,      :integer, default: 1
    add :relayed_at,   :utc_datetime_usec
    add :failed_at,    :utc_datetime_usec
    add :last_error,   :string
    timestamps(type: :utc_datetime_usec)
  end

  create index(:outbox_messages, [:relayed_at, :inserted_at])
  create index(:outbox_messages, [:failed_at])
end

2. Configure

config :phoenix_micro,
  outbox: [
    repo: MyApp.Repo,
    poll_interval_ms: 1_000,
    batch_size: 100,
    max_attempts: 5
  ]

3. Add the relay to your supervision tree

children = [
  MyApp.Repo,
  PhoenixMicro.Outbox.Relay
]

4. Use inside transactions

Repo.transaction(fn ->
  order = Repo.insert!(Order.changeset(%Order{}, params))
  Outbox.enqueue("orders.placed", %{order_id: order.id})
end)

Guarantees

  • At-least-once delivery — if the relay crashes mid-flight, the message is still in the database and will be retried.
  • No phantom events — if the outer transaction rolls back, the outbox row rolls back too.
  • Idempotent — the relay sets relayed_at only after a successful broker publish, so duplicate delivery is bounded.

Deduplication

Consumers should handle duplicates using PhoenixMicro.Middleware.Idempotency or their own deduplication logic. The message id is stable across retries.

Summary

Functions

Enqueues a message in the outbox table. Must be called inside an Ecto Repo.transaction/1 block.

Enqueues a message, raising on failure.

Functions

enqueue(topic, payload, opts \\ [])

@spec enqueue(String.t(), term(), keyword()) :: {:ok, map()} | {:error, term()}

Enqueues a message in the outbox table. Must be called inside an Ecto Repo.transaction/1 block.

Returns {:ok, outbox_record} or {:error, changeset}.

enqueue!(topic, payload, opts \\ [])

@spec enqueue!(String.t(), term(), keyword()) :: map()

Enqueues a message, raising on failure.