Mobus.Stepwise.Engine (mobus_stepwise v0.2.0)

Copy Markdown View Source

ALF-backed engine for stepwise workflows.

Stepwise workflows are intended for wizards and import pipelines:

  • linear(ish) progression
  • resumable via checkpoints
  • lighter semantics than full FSM (events are typically :next / :back)

Usage

spec = %{
  profile: :stepwise,
  initial_state: :step_one,
  steps: [:step_one, :step_two, :step_three],
  states: %{
    step_one: %{step_number: 1, ui: %{key: :step_one}},
    step_two: %{step_number: 2, ui: %{key: :step_two}},
    step_three: %{step_number: 3, ui: %{key: :step_three}}
  }
}

{:ok, runtime} = Engine.init(spec, %{tenant_id: "t1", execution_id: "e1", sync: true})
{:ok, runtime} = Engine.handle_event(runtime, :next, %{name: "Alice"})
projection = Engine.get_state(runtime)

Summary

Functions

Extracts a serializable checkpoint from the runtime for persistence.

Returns the current Mobus.Stepwise.Projection for the runtime.

Processes an event against the current runtime, advancing the workflow.

Initializes a new stepwise workflow runtime from a spec and runtime context.

Restores a runtime from a previously saved checkpoint.

Types

runtime()

@type runtime() :: %{
  :execution_id => String.t(),
  :tenant_id => String.t(),
  :spec => map(),
  :current_state => atom() | String.t(),
  :pipeline_mod => module(),
  optional(:context) => map(),
  optional(:history) => list(),
  optional(:trace) => list(),
  optional(:blocked_reasons) => map(),
  optional(:breakpoint_hits) => list(),
  optional(:meta) => map(),
  optional(:errors) => [map()],
  optional(:projection) => Mobus.Stepwise.Projection.t()
}

Functions

checkpoint(runtime)

@spec checkpoint(runtime()) :: map()

Extracts a serializable checkpoint from the runtime for persistence.

Strips the projection (non-serializable) and retains only the core runtime fields needed to resume the workflow later via restore/3.

Parameters

  • runtime — current runtime map

Returns

  • A plain map suitable for JSON serialization or database storage.

Examples

checkpoint = Engine.checkpoint(runtime)
# => %{execution_id: "e1", current_state: :step_two, context: %{name: "Alice"}, ...}

get_state(runtime)

@spec get_state(runtime()) :: Mobus.Stepwise.Projection.t() | map()

Returns the current Mobus.Stepwise.Projection for the runtime.

If the runtime already contains a computed projection, returns it directly. Otherwise, recomputes the projection by running the pipeline in projection-only mode (no state transition).

Parameters

  • runtime — current runtime map

Returns

  • %Mobus.Stepwise.Projection{} — the canonical UI projection struct

Examples

projection = Engine.get_state(runtime)
projection.current_state  #=> :step_two
projection.available_events  #=> [:back, :next]

handle_event(runtime, event, payload)

@spec handle_event(runtime(), atom() | String.t(), map()) ::
  {:ok, runtime()} | {:wait, runtime(), map()} | {:error, term(), runtime()}

Processes an event against the current runtime, advancing the workflow.

Sends the event through the ALF pipeline which, in order:

  1. Merges the payload into runtime.context
  2. Executes any step action (capability) defined for the current state
  3. Advances or reverses current_state based on the event
  4. Fires entry actions if the state changed
  5. Records breakpoint hits
  6. Computes the updated projection

Parameters

  • runtime — current runtime map (as returned by init/2 or a prior handle_event/3)
  • event — event atom or string (:next, :back, "next", "back", or custom)
  • payload — map of user input to merge into context

Returns

  • {:ok, runtime} — successful state transition with updated projection
  • {:wait, runtime, wait_cfg} — transition requires async resolution
  • {:error, reason, runtime} — action or pipeline failure

Examples

{:ok, runtime} = Engine.handle_event(runtime, :next, %{name: "Alice"})
{:ok, runtime} = Engine.handle_event(runtime, :back, %{})

init(spec, runtime_context)

@spec init(map(), map()) ::
  {:ok, runtime()}
  | {:wait, runtime(), map()}
  | {:error, {:initial_entry_action_failed, term(), runtime()} | term()}

Initializes a new stepwise workflow runtime from a spec and runtime context.

Normalizes the spec into internal representation (IR), starts the ALF pipeline (if not already running), builds the initial runtime map, fires entry actions for the initial state, and computes the first projection.

If the initial state defines a capability action whose trigger matches :enter, that action runs during init/2. When that action returns {:error, reason}, the error is propagated to the caller as {:error, {:initial_entry_action_failed, reason, runtime}} with blocked_reasons populated on the returned runtime. Previously such errors were silently discarded, causing workflows whose first step errored to advance as if the step had succeeded.

Parameters

  • spec — workflow specification map containing :profile, :initial_state, :steps, :states, and optionally :transitions, :breakpoints, :subscriptions
  • runtime_context — context map requiring :tenant_id and optionally :execution_id, :sync, and :initial_context

Returns

  • {:ok, runtime} — runtime map with computed projection
  • {:error, reason} — if tenant_id is missing or pipeline fails to start
  • {:error, {:initial_entry_action_failed, reason, runtime}} — if the initial state's entry capability returned {:error, reason}

Examples

spec = %{profile: :stepwise, initial_state: :step_one, steps: [:step_one, :step_two],
         states: %{step_one: %{step_number: 1, ui: %{key: :step_one}},
                   step_two: %{step_number: 2, ui: %{key: :step_two}}}}

{:ok, runtime} = Engine.init(spec, %{tenant_id: "t1", execution_id: "e1", sync: true})

restore(spec, checkpoint, runtime_context)

@spec restore(map(), map(), map()) :: {:ok, runtime()} | {:error, term()}

Restores a runtime from a previously saved checkpoint.

Re-normalizes the spec into IR, reconstitutes the runtime from checkpoint data, starts the pipeline if needed, and recomputes the projection. The restored runtime is fully functional for subsequent handle_event/3 calls.

Parameters

  • spec — the original workflow specification map
  • checkpoint — map previously returned by checkpoint/1
  • runtime_context — context map requiring :tenant_id and optionally :sync

Returns

  • {:ok, runtime} — restored runtime with recomputed projection
  • {:error, reason} — if tenant_id is missing or pipeline fails to start

Examples

checkpoint = Engine.checkpoint(runtime)
{:ok, restored} = Engine.restore(spec, checkpoint, %{tenant_id: "t1", sync: true})