PgFlow.FlowStarter (PgFlow v0.1.0)

Copy Markdown View Source

Supervised GenServer that registers flow and job modules with exponential backoff and jitter.

Replaces the earlier fire-and-forget Task approach whose restart: :temporary strategy turned any transient registration failure into permanent silent job non-execution.

Each module goes through up to four phases:

  1. :registryPgFlow.FlowRegistry.register/1
  2. :workerPgFlow.WorkerSupervisor.start_worker/2
  3. :notify_dbPgFlow.Queries.Pgmq.enable_notify_insert/3 (:notify mode only)
  4. :notify_registerPgFlow.Signal.Notify.register_worker/2 (:notify mode only)

All four operations are idempotent, so retries are safe.

Retry model

  • Transient errors (DB connectivity, supervisor not yet running) retry indefinitely with exponential backoff + uniform jitter.
  • Permanent errors (invalid flow module, missing __pgflow_definition__/0, pgmq SQL missing) stop retries immediately.

API

  • status/0 — snapshot of all modules
  • module_status/1 — single-module snapshot
  • ready?/0 — every module has reached a terminal state (converged). True even if all modules are :failed_permanent — use healthy?/0 to check whether any succeeded.
  • healthy?/0 — at least one module succeeded (empty config counts as healthy).
  • await_ready/1 — blocks until ready?/0 is true or timeout. Accepts :infinity.
  • retry_now/1 — force an immediate attempt (operational poke)

Telemetry

  • [:pgflow, :starter, :module, :attempt]
  • [:pgflow, :starter, :module, :success]
  • [:pgflow, :starter, :module, :retry_scheduled]
  • [:pgflow, :starter, :module, :failed_permanent]
  • [:pgflow, :starter, :ready]

Summary

Functions

Blocks until ready?/0 is true or timeout ms elapse.

Returns a specification to start this module under a supervisor.

Returns true when at least one module succeeded (or the config is empty).

Snapshot of a single module's state, or nil if not registered.

Returns true when every module has reached a terminal state (:succeeded or :failed_permanent). Empty config is trivially ready.

Force an immediate retry for a module (bypasses current backoff timer).

Snapshot of the starter state.

Functions

await_ready(timeout \\ 5000)

@spec await_ready(timeout()) :: :ok | {:error, :timeout}

Blocks until ready?/0 is true or timeout ms elapse.

Accepts :infinity to block indefinitely.

Returns :ok when ready, {:error, :timeout} on timeout.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

healthy?()

@spec healthy?() :: boolean()

Returns true when at least one module succeeded (or the config is empty).

module_status(module)

@spec module_status(module()) :: PgFlow.FlowStarter.ModuleState.t() | nil

Snapshot of a single module's state, or nil if not registered.

ready?()

@spec ready?() :: boolean()

Returns true when every module has reached a terminal state (:succeeded or :failed_permanent). Empty config is trivially ready.

This is the "starter has converged" signal, suitable for K8s readiness probes. For "pgflow is actually functional" checks, use healthy?/0.

retry_now(module)

@spec retry_now(module()) :: :ok

Force an immediate retry for a module (bypasses current backoff timer).

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

status()

@spec status() :: map()

Snapshot of the starter state.