Compensation-based saga coordinator for distributed transactions.
What is a saga?
A saga is a sequence of steps where each step has a corresponding compensation (undo) action. If any step fails, all previously completed steps are compensated in reverse order:
Step 1: Reserve inventory ─── compensation: Release inventory
Step 2: Charge payment ─── compensation: Refund payment
Step 3: Create shipment ─── compensation: Cancel shipment
Step 4: Send notification ─── compensation: (none — fire-and-forget)If Step 3 fails:
Compensation 2: Refund payment
Compensation 1: Release inventoryOTP model
Each saga execution runs as an independent GenServer process owned
by PhoenixMicro.Saga.Supervisor. This means:
- Sagas survive transport failures (they are local processes).
- Each saga has its own state and failure-isolation boundary.
- The supervisor can restart failed sagas if needed.
- You can query a saga's status by its ID at any time.
Defining a saga
defmodule MyApp.PlaceOrderSaga do
use PhoenixMicro.Saga
step :reserve_inventory,
execute: &MyApp.Inventory.reserve/1,
compensate: &MyApp.Inventory.release/1
step :charge_payment,
execute: &MyApp.Payments.charge/1,
compensate: &MyApp.Payments.refund/1
step :create_shipment,
execute: &MyApp.Shipping.create/1,
compensate: &MyApp.Shipping.cancel/1
step :notify_customer,
execute: &MyApp.Notifications.send_order_confirmation/1
# no compensate — notifications are fire-and-forget
endRunning a saga
# Start and await completion (synchronous)
case PhoenixMicro.Saga.run(PlaceOrderSaga, %{order_id: "ord_123"}) do
{:ok, final_context} -> handle_success(final_context)
{:compensated, reason, context} -> handle_rollback(reason, context)
{:error, reason} -> handle_fatal(reason)
end
# Start async — returns saga_id immediately
{:ok, saga_id} = PhoenixMicro.Saga.start(PlaceOrderSaga, %{order_id: "ord_123"})
# Check status later
{:ok, %{status: :completed}} = PhoenixMicro.Saga.status(saga_id)Step function signatures
Execute functions receive the current saga context map:
def my_execute(%{order_id: order_id} = ctx) do
case do_work(order_id) do
{:ok, result} ->
# Merge result into context for downstream steps
{:ok, Map.put(ctx, :my_result, result)}
{:error, reason} ->
{:error, reason}
end
endCompensate functions receive the same context map at the time the step completed:
def my_compensate(%{order_id: order_id, my_result: result}) do
undo_work(order_id, result)
:ok
endTelemetry
[:phoenix_micro, :saga, :started]— saga execution started[:phoenix_micro, :saga, :step_started]— individual step starting[:phoenix_micro, :saga, :step_ok]— step completed successfully[:phoenix_micro, :saga, :step_failed]— step returned error[:phoenix_micro, :saga, :compensating]— rollback started[:phoenix_micro, :saga, :completed]— all steps succeeded[:phoenix_micro, :saga, :compensated]— saga rolled back cleanly[:phoenix_micro, :saga, :fatal]— compensation itself failed
Summary
Functions
@spec run(module(), map(), keyword()) :: {:ok, map()} | {:compensated, term(), map()} | {:error, :compensation_failed, term()}
Runs a saga synchronously, blocking until completion or rollback. Returns:
{:ok, final_context}— all steps completed successfully.{:compensated, reason, context}— a step failed; compensations ran.{:error, :compensation_failed, details}— compensation itself failed.
Starts a saga asynchronously. Returns {:ok, saga_id}.
Use status/1 to poll for completion.
Returns the current status of a saga.
Defines a saga step.
Options:
:execute— required.fn(context) :: {:ok, context} | {:error, reason}:compensate— optional.fn(context) :: :ok | {:error, reason}:timeout— milliseconds for this step (default: 30_000):retries— number of retries on transient failure (default: 0)