SquidMesh.RunStore (squid_mesh v0.1.0-alpha.3)

Copy Markdown View Source

Durable run persistence and lifecycle operations.

This module translates between the public SquidMesh.Run struct and the underlying persistence schema while applying workflow-level rules such as payload validation, trigger resolution, replay lineage, and legal run-state transitions.

Summary

Functions

Requests cancellation for a run if its current status allows it.

Creates a new run for a workflow using the workflow's default trigger.

Creates a new run for a workflow through an explicit trigger.

Fetches one persisted run and returns the public run representation.

Lists runs using the supported filter set.

Creates a new pending run from a prior run while preserving replay lineage.

Returns whether a run in the given state should schedule additional step work.

Applies a validated run-state transition and persists the updated run.

Updates durable run fields without changing the run state machine directly.

Types

attrs_fun()

@type attrs_fun() :: (SquidMesh.Run.t() -> transition_attrs())

create_error()

@type create_error() ::
  {:invalid_payload, :expected_map}
  | {:invalid_payload, SquidMesh.Workflow.Definition.payload_error_details()}
  | {:invalid_trigger, atom() | String.t()}
  | {:invalid_workflow, module() | String.t()}
  | {:invalid_run, Ecto.Changeset.t()}

dispatch_fun()

@type dispatch_fun() :: (SquidMesh.Run.t() -> {:ok, term()} | {:error, term()})

failure_attrs_fun()

@type failure_attrs_fun() :: (SquidMesh.Run.t(), term() -> transition_attrs())

get_error()

@type get_error() :: :not_found

get_option()

@type get_option() :: {:include_history, boolean()}

list_filter()

@type list_filter() ::
  {:workflow, module()}
  | {:status, SquidMesh.Run.status()}
  | {:limit, pos_integer()}

list_filters()

@type list_filters() :: [list_filter()]

pause_result()

@type pause_result() ::
  %{
    run: SquidMesh.Run.t(),
    from_status: SquidMesh.Run.status(),
    to_status: SquidMesh.Run.status(),
    terminal_noop?: true,
    finalized_step?: boolean(),
    error: map()
  }
  | %{
      run: SquidMesh.Run.t(),
      from_status: SquidMesh.Run.status(),
      to_status: SquidMesh.Run.status()
    }

progress_event()

@type progress_event() :: run_transition_event() | term()

progress_operation()

@type progress_operation() ::
  :update
  | {:transition, SquidMesh.Run.status()}
  | {:dispatch, dispatch_fun()}
  | {:dispatch_or_fail, dispatch_fun(), failure_attrs_fun()}
  | {:transition_or_dispatch, SquidMesh.Run.status(), dispatch_fun()}
  | {:transition_or_dispatch_or_fail, SquidMesh.Run.status(), dispatch_fun(),
     failure_attrs_fun()}

progress_result()

@type progress_result() :: SquidMesh.Run.t() | :noop

replay_error()

@type replay_error() :: get_error() | create_error()

run_transition_event()

@type run_transition_event() ::
  {:run_transition, SquidMesh.Run.t(), SquidMesh.Run.status(),
   SquidMesh.Run.status()}

transition_attrs()

@type transition_attrs() :: %{
  optional(:context) => map(),
  optional(:current_step) => String.t() | atom() | nil,
  optional(:last_error) => map() | nil
}

transition_error()

@type transition_error() ::
  get_error()
  | SquidMesh.Runtime.StateMachine.transition_error()
  | {:invalid_run, Ecto.Changeset.t()}

update_error()

@type update_error() :: get_error() | {:invalid_run, Ecto.Changeset.t()}

Functions

cancel_run(repo, run_id)

@spec cancel_run(module(), Ecto.UUID.t()) ::
  {:ok, SquidMesh.Run.t()} | {:error, transition_error()}

Requests cancellation for a run if its current status allows it.

create_run(repo, workflow, payload)

@spec create_run(module(), module(), map()) ::
  {:ok, SquidMesh.Run.t()} | {:error, create_error()}

Creates a new run for a workflow using the workflow's default trigger.

create_run(repo, workflow, trigger_name, payload)

@spec create_run(module(), module(), atom(), map()) ::
  {:ok, SquidMesh.Run.t()} | {:error, create_error()}

Creates a new run for a workflow through an explicit trigger.

get_run(repo, run_id, opts \\ [])

@spec get_run(module(), Ecto.UUID.t(), [get_option()]) ::
  {:ok, SquidMesh.Run.t()} | {:error, get_error()}

Fetches one persisted run and returns the public run representation.

list_runs(repo, filters \\ [])

@spec list_runs(module(), list_filters()) :: {:ok, [SquidMesh.Run.t()]}

Lists runs using the supported filter set.

replay_run(repo, run_id)

@spec replay_run(module(), Ecto.UUID.t()) ::
  {:ok, SquidMesh.Run.t()} | {:error, replay_error()}

Creates a new pending run from a prior run while preserving replay lineage.

schedule_next_step?(status)

@spec schedule_next_step?(SquidMesh.Run.t() | SquidMesh.Run.status()) :: boolean()

Returns whether a run in the given state should schedule additional step work.

transition_run(repo, run_id, to_status, attrs \\ %{})

@spec transition_run(
  module(),
  Ecto.UUID.t(),
  SquidMesh.Run.status(),
  transition_attrs()
) ::
  {:ok, SquidMesh.Run.t()} | {:error, transition_error()}

Applies a validated run-state transition and persists the updated run.

update_run(repo, run_id, attrs)

@spec update_run(module(), Ecto.UUID.t(), transition_attrs()) ::
  {:ok, SquidMesh.Run.t()} | {:error, update_error()}

Updates durable run fields without changing the run state machine directly.