Hephaestus.Runtime.Runner.Local (hephaestus v0.3.1)

Copy Markdown View Source

Local OTP runner that executes one workflow instance per GenServer process.

Crash recovery restores the latest persisted instance state from storage, but schedule_resume/3 timers are process-local and do not survive a runner crash.

Summary

Functions

Returns a specification to start this module under a supervisor.

Resumes a waiting instance by sending an asynchronous cast to its runner process.

Schedules a delayed :timeout resume for a specific step using Process.send_after/3.

Creates a new workflow instance and spawns a local GenServer to execute it.

Starts and links a local runner GenServer for the given workflow instance.

Types

state()

@type state() :: %{
  instance: Hephaestus.Core.Instance.t(),
  registry: term(),
  storage: {module(), term()},
  task_supervisor: GenServer.server(),
  advance_count: non_neg_integer(),
  step_count: non_neg_integer(),
  waiting_since: integer() | nil
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

resume(instance_id, event)

@spec resume(String.t(), atom() | String.t()) :: :ok | {:error, :instance_not_found}

Resumes a waiting instance by sending an asynchronous cast to its runner process.

The event is normalized to an atom and delivered via GenServer.cast/2, so the call returns immediately. Returns {:error, :instance_not_found} if no runner process is registered for the given instance_id.

schedule_resume(instance_id, step_ref, delay_ms)

@spec schedule_resume(String.t(), atom(), pos_integer()) ::
  {:ok, reference()} | {:error, :instance_not_found}

Schedules a delayed :timeout resume for a specific step using Process.send_after/3.

Because the timer is process-local, it does not survive a runner crash. Returns {:ok, timer_ref} on success, where timer_ref is the Erlang timer reference that can be cancelled with Process.cancel_timer/1.

start_instance(workflow, context, opts)

@spec start_instance(module(), map(), keyword()) ::
  {:ok, String.t()} | {:error, term()}

Creates a new workflow instance and spawns a local GenServer to execute it.

The instance is persisted to storage immediately, then a transient child is started under the given DynamicSupervisor. The runner process registers itself in the provided Registry so it can be located by instance ID.

Options

  • :id — explicit instance ID to assign to the workflow instance.
  • :storage — a {module, name} tuple for the storage adapter.
  • :registry — the Registry used for process name registration.
  • :dynamic_supervisor — the DynamicSupervisor that will own the runner process.
  • :task_supervisor — the Task.Supervisor used to run step executions concurrently.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts and links a local runner GenServer for the given workflow instance.

Options

  • :instance (required) — the Hephaestus.Core.Instance struct to execute.
  • :instance_id — overrides the instance's own ID for registry lookup (defaults to instance.id).
  • :registry — the Registry used for process name registration.
  • :storage — a {module, name} tuple for the storage adapter.
  • :task_supervisor — the Task.Supervisor used to run step executions concurrently.