SquidMesh.Workflow.Definition (squid_mesh v0.1.0-alpha.7)

Copy Markdown View Source

Runtime-facing representation of a compiled workflow definition.

SquidMesh.Workflow builds the declarative DSL at compile time. This module loads the compiled definition and applies the runtime operations needed for run creation, payload resolution, and persistence serialization.

Summary

Functions

Applies the declared output mapping for one step result.

Resolves the success and rejection targets for an approval step.

Returns the default trigger for the workflow definition.

Returns true when the workflow uses dependency-based step progression.

Resolves dependency-mode progress from persisted per-step state.

Deserializes persisted payload keys back to declared workflow field names.

Deserializes a persisted step name back to the declared workflow step.

Deserializes a persisted trigger name back to the declared workflow trigger.

Returns the workflow entry step.

Returns the workflow entry steps in semantic execution order.

Returns the explicit failure recovery route for a step when one was declared.

Returns the first step scheduled when a run starts.

Builds the public per-step inspection view from declared steps and persisted step statuses.

Loads a compiled workflow definition from a workflow module.

Loads a workflow definition from its persisted module name.

Resolves the next step after a successful execution.

Resolves payload defaults and validates the final payload for a new run.

Resolves one named trigger from the workflow definition.

Serializes a step identifier for persistence.

Serializes a trigger identifier for persistence.

Serializes a workflow module name for persistence.

Fetches one declared workflow step by name.

Returns the compensation callback for one declared step, if any.

Returns the explicit input mapping for one declared step, if any.

Returns the explicit output mapping key for one declared step, if any.

Returns the recovery policy for one declared step.

Returns the local transaction boundary for one declared step, if any.

Resolves the full transition metadata for one step outcome.

Resolves the transition target for a step outcome.

Fetches one declared workflow trigger by name.

Returns completed steps whose recovery policy makes replay unsafe by default.

Validates a payload map against the workflow payload contract.

Types

built_in_step_kind()

@type built_in_step_kind() :: :wait | :log | :pause | :approval

dependency_progress()

@type dependency_progress() ::
  :complete
  | {:dispatch, [atom()]}
  | {:wait, [atom()]}
  | {:error, {:no_runnable_step, [atom()]}}

dependency_step_status()

@type dependency_step_status() :: :pending | :running | :completed | :failed

failure_recovery()

@type failure_recovery() :: %{
  strategy: failure_recovery_strategy(),
  target: transition_target() | String.t()
}

failure_recovery_strategy()

@type failure_recovery_strategy() :: :compensation | :undo

inspect_step()

@type inspect_step() :: %{
  step: atom(),
  depends_on: [atom()],
  status: inspect_step_status(),
  recovery: recovery_policy()
}

inspect_step_status()

@type inspect_step_status() :: dependency_step_status() | :waiting

load_error()

@type load_error() :: {:invalid_workflow, module() | String.t()}

payload_error_details()

@type payload_error_details() :: %{
  optional(:missing_fields) => [atom()],
  optional(:unknown_fields) => [atom() | String.t()],
  optional(:invalid_types) => %{optional(atom()) => atom()}
}

payload_field()

@type payload_field() :: %{name: atom(), type: atom(), opts: keyword()}

recovery_policy()

@type recovery_policy() :: %{
  optional(:compensation) => map(),
  optional(:failure) => failure_recovery(),
  irreversible?: boolean(),
  compensatable?: boolean(),
  recovery: :automatic | :manual_intervention,
  replay: :allowed | :manual_review_required
}

retry()

@type retry() :: %{step: atom(), opts: keyword()}

step()

@type step() :: %{
  name: atom(),
  module: module() | built_in_step_kind(),
  opts: keyword()
}

step_input_mapping()

@type step_input_mapping() :: [atom()]

step_output_mapping()

@type step_output_mapping() :: atom()

step_transaction_boundary()

@type step_transaction_boundary() :: :repo

t()

@type t() :: %{
  triggers: [trigger()],
  payload: [payload_field()],
  steps: [step()],
  transitions: [transition()],
  retries: [retry()],
  entry_steps: [atom()],
  initial_step: atom(),
  entry_step: atom() | nil
}

transition()

@type transition() :: %{
  :from => atom(),
  :on => transition_outcome(),
  :to => transition_target(),
  optional(:recovery) => failure_recovery_strategy()
}

transition_outcome()

@type transition_outcome() :: :ok | :error

transition_target()

@type transition_target() :: atom() | :complete

trigger()

@type trigger() :: %{
  name: atom(),
  type: trigger_type(),
  config: map(),
  payload: [payload_field()]
}

trigger_error()

@type trigger_error() :: {:invalid_trigger, atom() | String.t()}

trigger_type()

@type trigger_type() :: :manual | :cron

Functions

apply_output_mapping(definition, step_name, output)

@spec apply_output_mapping(t(), atom(), map()) ::
  {:ok, map()} | {:error, {:unknown_step, atom()}}

Applies the declared output mapping for one step result.

approval_transition_targets(definition, step_name)

@spec approval_transition_targets(t(), atom()) ::
  {:ok, %{ok: transition_target(), error: transition_target()}}
  | {:error, {:unknown_transition, atom(), atom()}}

Resolves the success and rejection targets for an approval step.

default_trigger(definition)

@spec default_trigger(t()) :: atom()

Returns the default trigger for the workflow definition.

dependency_mode?(definition)

@spec dependency_mode?(t()) :: boolean()

Returns true when the workflow uses dependency-based step progression.

dependency_progress(definition, step_statuses)

@spec dependency_progress(
  t(),
  %{optional(atom() | String.t()) => dependency_step_status() | String.t()}
) :: dependency_progress()

Resolves dependency-mode progress from persisted per-step state.

Ready steps are scheduled breadth-first by dependency phase so newly unlocked descendants do not bypass incomplete root or sibling steps.

deserialize_payload(definition, payload)

@spec deserialize_payload(t() | nil, map()) :: map()

Deserializes persisted payload keys back to declared workflow field names.

deserialize_step(definition, step_name)

@spec deserialize_step(t(), String.t() | nil) :: atom() | String.t() | nil

Deserializes a persisted step name back to the declared workflow step.

deserialize_trigger(definition, trigger_name)

@spec deserialize_trigger(t() | nil, String.t() | nil) :: atom() | String.t() | nil

Deserializes a persisted trigger name back to the declared workflow trigger.

entry_step(definition)

@spec entry_step(t()) :: atom() | nil

Returns the workflow entry step.

entry_steps(definition)

@spec entry_steps(t()) :: [atom()]

Returns the workflow entry steps in semantic execution order.

failure_recovery(definition, from_step)

@spec failure_recovery(t(), atom()) ::
  {:ok, failure_recovery() | nil}
  | {:error, {:unknown_transition, atom(), atom()}}

Returns the explicit failure recovery route for a step when one was declared.

initial_step(definition)

@spec initial_step(t()) :: atom()

Returns the first step scheduled when a run starts.

inspect_steps(definition, step_statuses \\ %{})

@spec inspect_steps(
  t(),
  %{
    optional(atom() | String.t()) =>
      dependency_step_status() | inspect_step_status()
  }
) :: [inspect_step()]

Builds the public per-step inspection view from declared steps and persisted step statuses.

load(workflow)

@spec load(module()) :: {:ok, t()} | {:error, load_error()}

Loads a compiled workflow definition from a workflow module.

load_serialized(workflow_name)

@spec load_serialized(String.t()) :: {:ok, module(), t()} | {:error, load_error()}

Loads a workflow definition from its persisted module name.

next_step_after_success(definition, from_step, completed_steps)

@spec next_step_after_success(t(), atom(), [atom() | String.t()]) ::
  {:ok, transition_target()} | {:error, {:no_runnable_step, [atom()]}}

Resolves the next step after a successful execution.

resolve_payload(definition, payload)

@spec resolve_payload(t(), map()) ::
  {:ok, map()} | {:error, {:invalid_payload, payload_error_details()}}

Resolves payload defaults and validates the final payload for a new run.

resolve_trigger(definition, trigger_name)

@spec resolve_trigger(t(), atom()) :: {:ok, atom()} | {:error, trigger_error()}

Resolves one named trigger from the workflow definition.

serialize_step(step)

@spec serialize_step(atom() | String.t() | nil) :: String.t() | nil

Serializes a step identifier for persistence.

serialize_trigger(trigger)

@spec serialize_trigger(atom() | String.t() | nil) :: String.t() | nil

Serializes a trigger identifier for persistence.

serialize_workflow(workflow)

@spec serialize_workflow(module()) :: String.t()

Serializes a workflow module name for persistence.

step(definition, step_name)

@spec step(t(), atom()) :: {:ok, step()} | {:error, {:unknown_step, atom()}}

Fetches one declared workflow step by name.

step_compensation_callback(definition, step_name)

@spec step_compensation_callback(t(), atom()) ::
  {:ok, module() | nil} | {:error, {:unknown_step, atom()}}

Returns the compensation callback for one declared step, if any.

A callback means the step's completed side effect is reversible by a host application action. The runtime uses it only during saga rollback after a downstream terminal failure, never as a same-step fallback.

step_input_mapping(definition, step_name)

@spec step_input_mapping(t(), atom()) ::
  {:ok, step_input_mapping() | nil} | {:error, {:unknown_step, atom()}}

Returns the explicit input mapping for one declared step, if any.

step_output_mapping(definition, step_name)

@spec step_output_mapping(t(), atom()) ::
  {:ok, step_output_mapping() | nil} | {:error, {:unknown_step, atom()}}

Returns the explicit output mapping key for one declared step, if any.

step_recovery_policy(definition, step_name)

@spec step_recovery_policy(t(), atom()) ::
  {:ok, recovery_policy()} | {:error, {:unknown_step, atom()}}

Returns the recovery policy for one declared step.

Irreversible steps are always treated as non-compensatable. Steps marked compensatable: false keep their reversibility marker but still require explicit operator review before replay.

step_transaction_boundary(definition, step_name)

@spec step_transaction_boundary(t(), atom()) ::
  {:ok, step_transaction_boundary() | nil} | {:error, {:unknown_step, atom()}}

Returns the local transaction boundary for one declared step, if any.

:repo wraps only the host action execution in the configured Ecto repo transaction. Squid Mesh persists attempt, step, and run progression in its normal durable phase after the action returns.

transition(definition, from_step, outcome)

@spec transition(t(), atom(), transition_outcome()) ::
  {:ok, transition()} | {:error, {:unknown_transition, atom(), atom()}}

Resolves the full transition metadata for one step outcome.

transition_target(definition, from_step, outcome)

@spec transition_target(t(), atom(), transition_outcome()) ::
  {:ok, transition_target()} | {:error, {:unknown_transition, atom(), atom()}}

Resolves the transition target for a step outcome.

trigger(definition, trigger_name)

@spec trigger(t(), atom()) :: {:ok, trigger()} | {:error, trigger_error()}

Fetches one declared workflow trigger by name.

unsafe_replay_steps(definition, completed_steps)

@spec unsafe_replay_steps(t(), [
  atom() | String.t() | {atom() | String.t(), map() | nil}
]) :: [map()]

Returns completed steps whose recovery policy makes replay unsafe by default.

validate_payload(definition, payload)

@spec validate_payload(t(), map()) ::
  :ok | {:error, {:invalid_payload, payload_error_details()}}

Validates a payload map against the workflow payload contract.