PhoenixMicro.Saga (PhoenixMicro v1.0.0)

Copy Markdown View Source

Compensation-based saga coordinator for distributed transactions.

What is a saga?

A saga is a sequence of steps where each step has a corresponding compensation (undo) action. If any step fails, all previously completed steps are compensated in reverse order:

Step 1: Reserve inventory    compensation: Release inventory
Step 2: Charge payment       compensation: Refund payment
Step 3: Create shipment      compensation: Cancel shipment
Step 4: Send notification    compensation: (none  fire-and-forget)

If Step 3 fails:

Compensation 2: Refund payment
Compensation 1: Release inventory

OTP model

Each saga execution runs as an independent GenServer process owned by PhoenixMicro.Saga.Supervisor. This means:

  • Sagas survive transport failures (they are local processes).
  • Each saga has its own state and failure-isolation boundary.
  • The supervisor can restart failed sagas if needed.
  • You can query a saga's status by its ID at any time.

Defining a saga

defmodule MyApp.PlaceOrderSaga do
  use PhoenixMicro.Saga

  step :reserve_inventory,
    execute: &MyApp.Inventory.reserve/1,
    compensate: &MyApp.Inventory.release/1

  step :charge_payment,
    execute: &MyApp.Payments.charge/1,
    compensate: &MyApp.Payments.refund/1

  step :create_shipment,
    execute: &MyApp.Shipping.create/1,
    compensate: &MyApp.Shipping.cancel/1

  step :notify_customer,
    execute: &MyApp.Notifications.send_order_confirmation/1
    # no compensate — notifications are fire-and-forget
end

Running a saga

# Start and await completion (synchronous)
case PhoenixMicro.Saga.run(PlaceOrderSaga, %{order_id: "ord_123"}) do
  {:ok, final_context} -> handle_success(final_context)
  {:compensated, reason, context} -> handle_rollback(reason, context)
  {:error, reason} -> handle_fatal(reason)
end

# Start async — returns saga_id immediately
{:ok, saga_id} = PhoenixMicro.Saga.start(PlaceOrderSaga, %{order_id: "ord_123"})

# Check status later
{:ok, %{status: :completed}} = PhoenixMicro.Saga.status(saga_id)

Step function signatures

Execute functions receive the current saga context map:

def my_execute(%{order_id: order_id} = ctx) do
  case do_work(order_id) do
    {:ok, result} ->
      # Merge result into context for downstream steps
      {:ok, Map.put(ctx, :my_result, result)}

    {:error, reason} ->
      {:error, reason}
  end
end

Compensate functions receive the same context map at the time the step completed:

def my_compensate(%{order_id: order_id, my_result: result}) do
  undo_work(order_id, result)
  :ok
end

Telemetry

  • [:phoenix_micro, :saga, :started] — saga execution started
  • [:phoenix_micro, :saga, :step_started] — individual step starting
  • [:phoenix_micro, :saga, :step_ok] — step completed successfully
  • [:phoenix_micro, :saga, :step_failed] — step returned error
  • [:phoenix_micro, :saga, :compensating] — rollback started
  • [:phoenix_micro, :saga, :completed] — all steps succeeded
  • [:phoenix_micro, :saga, :compensated] — saga rolled back cleanly
  • [:phoenix_micro, :saga, :fatal] — compensation itself failed

Summary

Functions

Runs a saga synchronously, blocking until completion or rollback. Returns

Starts a saga asynchronously. Returns {:ok, saga_id}. Use status/1 to poll for completion.

Returns the current status of a saga.

Defines a saga step.

Functions

run(saga_module, initial_context, opts \\ [])

@spec run(module(), map(), keyword()) ::
  {:ok, map()}
  | {:compensated, term(), map()}
  | {:error, :compensation_failed, term()}

Runs a saga synchronously, blocking until completion or rollback. Returns:

  • {:ok, final_context} — all steps completed successfully.
  • {:compensated, reason, context} — a step failed; compensations ran.
  • {:error, :compensation_failed, details} — compensation itself failed.

start(saga_module, initial_context, opts \\ [])

@spec start(module(), map(), keyword()) :: {:ok, String.t()} | {:error, term()}

Starts a saga asynchronously. Returns {:ok, saga_id}. Use status/1 to poll for completion.

status(saga_id)

@spec status(String.t()) :: {:ok, map()} | {:error, :not_found}

Returns the current status of a saga.

step(name, opts)

(macro)

Defines a saga step.

Options:

  • :execute — required. fn(context) :: {:ok, context} | {:error, reason}

  • :compensate — optional. fn(context) :: :ok | {:error, reason}

  • :timeout — milliseconds for this step (default: 30_000)
  • :retries — number of retries on transient failure (default: 0)