Supervised GenServer that registers flow and job modules with exponential backoff and jitter.
Replaces the earlier fire-and-forget Task approach whose
restart: :temporary strategy turned any transient registration failure
into permanent silent job non-execution.
Each module goes through up to four phases:
:registry—PgFlow.FlowRegistry.register/1:worker—PgFlow.WorkerSupervisor.start_worker/2:notify_db—PgFlow.Queries.Pgmq.enable_notify_insert/3(:notifymode only):notify_register—PgFlow.Signal.Notify.register_worker/2(:notifymode only)
All four operations are idempotent, so retries are safe.
Retry model
- Transient errors (DB connectivity, supervisor not yet running) retry indefinitely with exponential backoff + uniform jitter.
- Permanent errors (invalid flow module, missing
__pgflow_definition__/0, pgmq SQL missing) stop retries immediately.
API
status/0— snapshot of all modulesmodule_status/1— single-module snapshotready?/0— every module has reached a terminal state (converged). True even if all modules are:failed_permanent— usehealthy?/0to check whether any succeeded.healthy?/0— at least one module succeeded (empty config counts as healthy).await_ready/1— blocks untilready?/0is true or timeout. Accepts:infinity.retry_now/1— force an immediate attempt (operational poke)
Telemetry
[:pgflow, :starter, :module, :attempt][:pgflow, :starter, :module, :success][:pgflow, :starter, :module, :retry_scheduled][:pgflow, :starter, :module, :failed_permanent][:pgflow, :starter, :ready]
Summary
Functions
Blocks until ready?/0 is true or timeout ms elapse.
Returns a specification to start this module under a supervisor.
Returns true when at least one module succeeded (or the config is empty).
Snapshot of a single module's state, or nil if not registered.
Returns true when every module has reached a terminal state
(:succeeded or :failed_permanent). Empty config is trivially ready.
Force an immediate retry for a module (bypasses current backoff timer).
Snapshot of the starter state.
Functions
@spec await_ready(timeout()) :: :ok | {:error, :timeout}
Blocks until ready?/0 is true or timeout ms elapse.
Accepts :infinity to block indefinitely.
Returns :ok when ready, {:error, :timeout} on timeout.
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec healthy?() :: boolean()
Returns true when at least one module succeeded (or the config is empty).
@spec module_status(module()) :: PgFlow.FlowStarter.ModuleState.t() | nil
Snapshot of a single module's state, or nil if not registered.
@spec ready?() :: boolean()
Returns true when every module has reached a terminal state
(:succeeded or :failed_permanent). Empty config is trivially ready.
This is the "starter has converged" signal, suitable for K8s readiness
probes. For "pgflow is actually functional" checks, use healthy?/0.
@spec retry_now(module()) :: :ok
Force an immediate retry for a module (bypasses current backoff timer).
@spec start_link(keyword()) :: GenServer.on_start()
@spec status() :: map()
Snapshot of the starter state.