Ecto/PostgreSQL storage adapter implementing Hephaestus.Runtime.Storage.
Uses a single workflow_instances table with JSONB state. The Repo module
is stored in persistent_term at startup — no GenServer process overhead.
Workflow versioning is persisted alongside each instance. Query filters support both exact workflow version matches and workflow-family prefix matching for versioned workflow modules.
Usage
Configure as the storage adapter in your Hephaestus entry module:
defmodule MyApp.Hephaestus do
use Hephaestus,
storage: {HephaestusEcto.Storage, repo: MyApp.Repo}
endNamed instances
Multiple storage instances can coexist by passing a :name option:
HephaestusEcto.Storage.start_link(repo: MyApp.Repo, name: :tenant_a)
HephaestusEcto.Storage.get(:tenant_a, instance_id)The arity-1 callbacks (behaviour interface) use __MODULE__ as the default name.
The arity-2 versions accept an explicit name as the first argument.
Query filters
query/1 and query/2 support these filters:
:id— match an exact workflow instance ID:status— match a specific runtime status:status_in— match any runtime status in a list:workflow— match an exact workflow module:workflow_version— match an exact integer workflow version:workflow_family— prefix match on the stored workflow module name
Examples:
HephaestusEcto.Storage.query(id: "invoiceid::abc123")
HephaestusEcto.Storage.query(status: :running)
HephaestusEcto.Storage.query(status_in: [:pending, :running, :waiting])
HephaestusEcto.Storage.query(workflow: MyApp.Workflows.Invoice.V2)
HephaestusEcto.Storage.query(workflow_version: 2)
HephaestusEcto.Storage.query(workflow_family: "Elixir.MyApp.Workflows.Invoice.")
Summary
Functions
Returns a child spec for supervision tree integration.
Deletes a workflow instance by ID.
Deletes a workflow instance by ID using a named storage.
Retrieves a workflow instance by ID.
Retrieves a workflow instance by ID using a named storage.
Persists a workflow instance.
Persists a workflow instance using a named storage.
Queries workflow instances by filters.
Queries workflow instances by filters using a named storage.
Stores the Repo reference in persistent_term and returns :ignore.
Types
Functions
@spec child_spec(keyword()) :: Supervisor.child_spec()
Returns a child spec for supervision tree integration.
Options
:repo(required) — the Ecto.Repo module to use:name— a name for this storage instance (defaults toHephaestusEcto.Storage)
@spec delete(String.t()) :: :ok
Deletes a workflow instance by ID.
Idempotent — returns :ok even if the instance does not exist.
Deletes a workflow instance by ID using a named storage.
@spec get(String.t()) :: {:ok, Hephaestus.Core.Instance.t()} | {:error, :not_found}
Retrieves a workflow instance by ID.
Returns {:ok, instance} if found, {:error, :not_found} otherwise.
@spec get(name(), String.t()) :: {:ok, Hephaestus.Core.Instance.t()} | {:error, :not_found}
Retrieves a workflow instance by ID using a named storage.
@spec put(Hephaestus.Core.Instance.t()) :: :ok
Persists a workflow instance.
Uses upsert semantics — inserts a new record or replaces workflow, status,
state, and updated_at on conflict. workflow_version is inserted on first
write and left unchanged on conflict so historical rows keep their original version.
@spec put(name(), Hephaestus.Core.Instance.t()) :: :ok
Persists a workflow instance using a named storage.
workflow_version is serialized from the %Hephaestus.Core.Instance{} and stored in the
workflow_instances.workflow_version column.
@spec query(filters()) :: [Hephaestus.Core.Instance.t()]
Queries workflow instances by filters.
Supported filters
:id— filter by exact instance ID:status— filter by instance status (e.g.,:running,:waiting):status_in— filter by multiple instance statuses:workflow— filter by workflow module:workflow_version— filter by exact workflow version (integer):workflow_family— filter by workflow module prefix (LIKE match)
Examples
HephaestusEcto.Storage.query(status: :running)
HephaestusEcto.Storage.query(id: "orderid::123")
HephaestusEcto.Storage.query(status_in: [:running, :waiting])
HephaestusEcto.Storage.query(status: :waiting, workflow: MyApp.OrderWorkflow)
HephaestusEcto.Storage.query(workflow_version: 2)
HephaestusEcto.Storage.query(workflow_family: "Elixir.MyApp.Workflows.Order.")
@spec query(name(), filters()) :: [Hephaestus.Core.Instance.t()]
Queries workflow instances by filters using a named storage.
@spec start_link(keyword()) :: :ignore
Stores the Repo reference in persistent_term and returns :ignore.
No process is started — this only registers the Repo for later lookup.
Options
:repo(required) — the Ecto.Repo module:name— storage instance name (defaults toHephaestusEcto.Storage)