Persistence helpers for durable workflow definitions and ordered step rows.
Summary
Functions
Appends one workflow_transition row using the supplied repo (so callers can
participate in the progression transaction). Required keys: workflow_run_id,
to_state, and reason. Optional keys: workflow_step_id, delivery_id,
from_state, context, inserted_at.
Returns the workflow run state for the given tenant_id and execution_id
(the workflow run's primary key). The returned map includes the authoritative
State Spine fields plus the current step key for operator-friendly inspection.
Looks up a workflow_step by step_key inside the same workflow definition.
Returns nil if the step does not exist — callers treat this as a noop
rather than crashing the progression transaction.
Returns the active workflow_step row for a workflow_run, raising if none.
Returns the canonical workflow_run row by id, raising if not found. Used by the progression service after locking the row inside its transaction.
Returns the structural WorkflowTransition records for a given execution,
strictly scoped to the supplied tenant_id.
Locks a workflow run for update inside the given repo and returns it. Returns
{:error, :workflow_run_not_found} if the row no longer exists. Must be
invoked inside a transaction.
Routes an incoming signal to all waiting workflow runs for the same tenant
whose pending_signals list contains the signal's event_name.
Updates a workflow run row with the supplied fields. Used by the progression service to record waiting state and reason/context, advance the current step cursor, or reactivate a previously waiting run.
Functions
@spec active_step_linkage(Ecto.UUID.t() | map()) :: {:ok, %{ workflow_run_id: Ecto.UUID.t(), workflow_step_id: Ecto.UUID.t(), channel: String.t() } | nil} | {:error, term()}
@spec append_transition(Ecto.Repo.t(), map()) :: {:ok, Chimeway.Workflows.WorkflowTransition.t()} | {:error, Ecto.Changeset.t()}
Appends one workflow_transition row using the supplied repo (so callers can
participate in the progression transaction). Required keys: workflow_run_id,
to_state, and reason. Optional keys: workflow_step_id, delivery_id,
from_state, context, inserted_at.
@spec create_initial_run( Ecto.Repo.t(), Ecto.UUID.t(), Chimeway.Workflows.WorkflowDefinition.t(), DateTime.t(), String.t() ) :: {:ok, Chimeway.Workflows.WorkflowRun.t()} | {:error, term()}
@spec ensure_definition( Ecto.Repo.t(), String.t(), Chimeway.Notifier.workflow_resolution() ) :: {:ok, Chimeway.Workflows.WorkflowDefinition.t()} | {:error, term()}
@spec explain(String.t(), Ecto.UUID.t()) :: {:ok, %{ id: Ecto.UUID.t(), tenant_id: String.t(), state: atom(), status_reason: String.t() | nil, current_step_name: String.t() | nil, suspended_until: DateTime.t() | nil, pending_signals: [String.t()], terminal_reason: String.t() | nil }} | {:error, :not_found}
Returns the workflow run state for the given tenant_id and execution_id
(the workflow run's primary key). The returned map includes the authoritative
State Spine fields plus the current step key for operator-friendly inspection.
Returns {:error, :not_found} if the run does not exist or belongs to a
different tenant — preventing cross-tenant information disclosure (T-27-05).
@spec fetch_definition(String.t(), pos_integer()) :: {:ok, Chimeway.Workflows.WorkflowDefinition.t() | nil} | {:error, term()}
@spec fetch_step_by_key(Ecto.UUID.t(), String.t()) :: Chimeway.Workflows.WorkflowStep.t() | nil
Looks up a workflow_step by step_key inside the same workflow definition.
Returns nil if the step does not exist — callers treat this as a noop
rather than crashing the progression transaction.
@spec get_current_step!(Chimeway.Workflows.WorkflowRun.t()) :: Chimeway.Workflows.WorkflowStep.t()
Returns the active workflow_step row for a workflow_run, raising if none.
@spec get_run!(Ecto.UUID.t()) :: Chimeway.Workflows.WorkflowRun.t()
Returns the canonical workflow_run row by id, raising if not found. Used by the progression service after locking the row inside its transaction.
@spec list_traces(String.t(), Ecto.UUID.t(), keyword()) :: {:ok, [Chimeway.Workflows.WorkflowTransition.t()]} | {:error, :not_found}
Returns the structural WorkflowTransition records for a given execution,
strictly scoped to the supplied tenant_id.
Trace context intentionally contains only structural progression metadata
(e.g., event_name, step_key). Raw signal payloads are never written to
transition context, making this surface payload-safe by construction (T-27-04).
Returns {:error, :not_found} if the workflow run does not exist or belongs
to a different tenant.
Opts:
:limit— max number of traces to return (default: all)
@spec lock_run(Ecto.Repo.t(), Ecto.UUID.t()) :: {:ok, Chimeway.Workflows.WorkflowRun.t()} | {:error, :workflow_run_not_found}
Locks a workflow run for update inside the given repo and returns it. Returns
{:error, :workflow_run_not_found} if the row no longer exists. Must be
invoked inside a transaction.
@spec persisted_workflow(Ecto.UUID.t() | map()) :: {:ok, Chimeway.Notifier.workflow_resolution() | nil} | {:error, term()}
@spec route_signal(Chimeway.Signals.Signal.t()) :: {:ok, map()} | {:error, term()}
Routes an incoming signal to all waiting workflow runs for the same tenant
whose pending_signals list contains the signal's event_name.
For each matched run the function:
- Transitions the run from
:waitingto:activeand clearspending_signals. - Appends an immutable
WorkflowTransitionrecording theevent_name(but not the raw payload — payload safety is enforced here per the threat model requirement T-27-03).
All mutations per run are wrapped in one Ecto.Multi transaction so the state
update and the trace record are always atomically consistent.
Cross-tenant isolation is enforced structurally: the query always filters by
tenant_id = ^signal.tenant_id, making it structurally impossible for a signal
from one tenant to resume a run belonging to another.
Returns {:ok, results_map} where results_map contains per-run outcomes keyed
by {:run_updated, run.id} and {:transition_inserted, run.id}.
@spec update_run(Ecto.Repo.t(), Chimeway.Workflows.WorkflowRun.t(), map()) :: {:ok, Chimeway.Workflows.WorkflowRun.t()} | {:error, Ecto.Changeset.t()}
Updates a workflow run row with the supplied fields. Used by the progression service to record waiting state and reason/context, advance the current step cursor, or reactivate a previously waiting run.
@spec upsert_definition(String.t(), Chimeway.Notifier.workflow_resolution()) :: {:ok, Chimeway.Workflows.WorkflowDefinition.t()} | {:error, term()}