HephaestusEcto.Storage (hephaestus_ecto v0.3.0)

Copy Markdown View Source

Ecto/PostgreSQL storage adapter implementing Hephaestus.Runtime.Storage.

Uses a single workflow_instances table with JSONB state. The Repo module is stored in persistent_term at startup — no GenServer process overhead.

Workflow versioning is persisted alongside each instance. Query filters support both exact workflow version matches and workflow-family prefix matching for versioned workflow modules.

Usage

Configure as the storage adapter in your Hephaestus entry module:

defmodule MyApp.Hephaestus do
  use Hephaestus,
    storage: {HephaestusEcto.Storage, repo: MyApp.Repo}
end

Named instances

Multiple storage instances can coexist by passing a :name option:

HephaestusEcto.Storage.start_link(repo: MyApp.Repo, name: :tenant_a)
HephaestusEcto.Storage.get(:tenant_a, instance_id)

The arity-1 callbacks (behaviour interface) use __MODULE__ as the default name. The arity-2 versions accept an explicit name as the first argument.

Query filters

query/1 and query/2 support these filters:

  • :id — match an exact workflow instance ID
  • :status — match a specific runtime status
  • :status_in — match any runtime status in a list
  • :workflow — match an exact workflow module
  • :workflow_version — match an exact integer workflow version
  • :workflow_family — prefix match on the stored workflow module name

Examples:

HephaestusEcto.Storage.query(id: "invoiceid::abc123")
HephaestusEcto.Storage.query(status: :running)
HephaestusEcto.Storage.query(status_in: [:pending, :running, :waiting])
HephaestusEcto.Storage.query(workflow: MyApp.Workflows.Invoice.V2)
HephaestusEcto.Storage.query(workflow_version: 2)
HephaestusEcto.Storage.query(workflow_family: "Elixir.MyApp.Workflows.Invoice.")

Summary

Functions

Returns a child spec for supervision tree integration.

Deletes a workflow instance by ID.

Deletes a workflow instance by ID using a named storage.

Retrieves a workflow instance by ID.

Retrieves a workflow instance by ID using a named storage.

Persists a workflow instance.

Persists a workflow instance using a named storage.

Queries workflow instances by filters.

Queries workflow instances by filters using a named storage.

Stores the Repo reference in persistent_term and returns :ignore.

Types

filters()

@type filters() :: keyword()

name()

@type name() :: term()

Functions

child_spec(opts)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Returns a child spec for supervision tree integration.

Options

  • :repo (required) — the Ecto.Repo module to use
  • :name — a name for this storage instance (defaults to HephaestusEcto.Storage)

delete(id)

@spec delete(String.t()) :: :ok

Deletes a workflow instance by ID.

Idempotent — returns :ok even if the instance does not exist.

delete(name, id)

@spec delete(name(), String.t()) :: :ok

Deletes a workflow instance by ID using a named storage.

get(id)

@spec get(String.t()) :: {:ok, Hephaestus.Core.Instance.t()} | {:error, :not_found}

Retrieves a workflow instance by ID.

Returns {:ok, instance} if found, {:error, :not_found} otherwise.

get(name, id)

@spec get(name(), String.t()) ::
  {:ok, Hephaestus.Core.Instance.t()} | {:error, :not_found}

Retrieves a workflow instance by ID using a named storage.

put(instance)

@spec put(Hephaestus.Core.Instance.t()) :: :ok

Persists a workflow instance.

Uses upsert semantics — inserts a new record or replaces workflow, status, state, and updated_at on conflict. workflow_version is inserted on first write and left unchanged on conflict so historical rows keep their original version.

put(name, instance)

@spec put(name(), Hephaestus.Core.Instance.t()) :: :ok

Persists a workflow instance using a named storage.

workflow_version is serialized from the %Hephaestus.Core.Instance{} and stored in the workflow_instances.workflow_version column.

query(filters)

@spec query(filters()) :: [Hephaestus.Core.Instance.t()]

Queries workflow instances by filters.

Supported filters

  • :id — filter by exact instance ID
  • :status — filter by instance status (e.g., :running, :waiting)
  • :status_in — filter by multiple instance statuses
  • :workflow — filter by workflow module
  • :workflow_version — filter by exact workflow version (integer)
  • :workflow_family — filter by workflow module prefix (LIKE match)

Examples

HephaestusEcto.Storage.query(status: :running)
HephaestusEcto.Storage.query(id: "orderid::123")
HephaestusEcto.Storage.query(status_in: [:running, :waiting])
HephaestusEcto.Storage.query(status: :waiting, workflow: MyApp.OrderWorkflow)
HephaestusEcto.Storage.query(workflow_version: 2)
HephaestusEcto.Storage.query(workflow_family: "Elixir.MyApp.Workflows.Order.")

query(name, filters)

@spec query(name(), filters()) :: [Hephaestus.Core.Instance.t()]

Queries workflow instances by filters using a named storage.

start_link(opts \\ [])

@spec start_link(keyword()) :: :ignore

Stores the Repo reference in persistent_term and returns :ignore.

No process is started — this only registers the Repo for later lookup.

Options