SquidMesh.StepRunStore (squid_mesh v0.1.0-alpha.3)

Copy Markdown View Source

Durable store for per-step workflow execution state.

Step runs are used to detect stale or duplicate deliveries and to persist step input, output, and failure details separately from the parent run.

Summary

Functions

Marks a step as ready for execution if it has not already completed or been claimed by another delivery of the same workflow step.

Marks a paused manual step as completed, persists its output, and records the durable manual action metadata.

Marks a step run as completed and persists its output.

Lists the completed step outputs for one workflow run in completion order.

Lists the completed step identifiers for one workflow run.

Marks a step run as failed and persists the last error.

Fetches the persisted step run for one workflow run and step identifier.

Persists approval resume metadata for a running approval step without completing it.

Persists pause-resume metadata for a running pause step without completing it.

Persists that a step has been scheduled but not yet claimed by a worker.

Lists the persisted step status for each declared step in a workflow run.

Types

approval_targets()

@type approval_targets() :: %{ok: pause_target(), error: pause_target()}

begin_result()

@type begin_result() :: {:ok, SquidMesh.Persistence.StepRun.t(), :execute | :skip}

manual_event()

@type manual_event() :: map()

pause_target()

@type pause_target() :: :complete | atom()

schedule_result()

@type schedule_result() ::
  {:ok, SquidMesh.Persistence.StepRun.t(), :schedule | :skip}
  | {:error, Ecto.Changeset.t()}

stale_error()

@type stale_error() :: {:stale_step_run, String.t()}

step_error()

@type step_error() :: map()

step_identifier()

@type step_identifier() :: atom() | String.t()

step_input()

@type step_input() :: map()

step_output()

@type step_output() :: map()

step_schedule_input()

@type step_schedule_input() :: {step_identifier(), step_input()}

step_status()

@type step_status() :: :pending | :running | :completed | :failed

Functions

begin_step(repo, run_id, step, input)

@spec begin_step(module(), Ecto.UUID.t(), step_identifier(), step_input()) ::
  begin_result()

Marks a step as ready for execution if it has not already completed or been claimed by another delivery of the same workflow step.

complete_manual_step(repo, step_run_id, output, manual)

@spec complete_manual_step(module(), Ecto.UUID.t(), step_output(), manual_event()) ::
  {:ok, SquidMesh.Persistence.StepRun.t()}
  | {:error, :not_found | stale_error()}

Marks a paused manual step as completed, persists its output, and records the durable manual action metadata.

complete_step(repo, step_run_id, output)

@spec complete_step(module(), Ecto.UUID.t(), step_output()) ::
  {:ok, SquidMesh.Persistence.StepRun.t()}
  | {:error, :not_found | stale_error()}

Marks a step run as completed and persists its output.

completed_outputs(repo, run_id)

@spec completed_outputs(module(), Ecto.UUID.t()) :: [step_output()]

Lists the completed step outputs for one workflow run in completion order.

completed_steps(repo, run_id)

@spec completed_steps(module(), Ecto.UUID.t()) :: [String.t()]

Lists the completed step identifiers for one workflow run.

fail_step(repo, step_run_id, error)

@spec fail_step(module(), Ecto.UUID.t(), step_error()) ::
  {:ok, SquidMesh.Persistence.StepRun.t()}
  | {:error, :not_found | stale_error()}

Marks a step run as failed and persists the last error.

get_step_run(repo, run_id, step)

@spec get_step_run(module(), Ecto.UUID.t(), step_identifier()) ::
  SquidMesh.Persistence.StepRun.t() | nil

Fetches the persisted step run for one workflow run and step identifier.

persist_approval_resume(repo, step_run_id, map, output_key)

@spec persist_approval_resume(
  module(),
  Ecto.UUID.t(),
  approval_targets(),
  atom() | nil
) ::
  {:ok, SquidMesh.Persistence.StepRun.t()}
  | {:error, :not_found | stale_error()}

Persists approval resume metadata for a running approval step without completing it.

persist_pause_resume(repo, step_run_id, output, target)

@spec persist_pause_resume(module(), Ecto.UUID.t(), step_output(), pause_target()) ::
  {:ok, SquidMesh.Persistence.StepRun.t()}
  | {:error, :not_found | stale_error()}

Persists pause-resume metadata for a running pause step without completing it.

schedule_step(repo, run_id, step, input)

@spec schedule_step(module(), Ecto.UUID.t(), step_identifier(), step_input()) ::
  schedule_result()

Persists that a step has been scheduled but not yet claimed by a worker.

step_statuses(repo, run_id)

@spec step_statuses(module(), Ecto.UUID.t()) :: %{
  optional(String.t()) => step_status()
}

Lists the persisted step status for each declared step in a workflow run.