GenServer that polls pgmq and executes flow tasks.
Each worker is responsible for a single flow/queue and can execute multiple tasks concurrently up to the configured limit.
Implements the two-phase protocol:
- pgmq.read() - Reserve messages from pgmq (non-blocking)
- start_tasks() - Create step_tasks records and get task details
Signal Strategies
The worker supports two signal strategies for detecting new messages:
:polling- Adaptive jittered exponential backoff (1s → 5s). Polls fast when busy, backs off when idle.:notify- LISTEN/NOTIFY via pgmq'senable_notify_insert. Near-instant wake-ups with a 30s fallback poll.
Configuration
Workers are configured via the config parameter passed to start_link/1:
config = %{
flow_module: MyApp.Flows.ProcessOrder,
repo: MyApp.Repo,
worker_id: "550e8400-e29b-41d4-a716-446655440000",
max_concurrency: 10,
batch_size: 10,
signal_strategy: :polling,
min_poll_interval: 1_000,
max_poll_interval: 5_000,
notify_fallback_interval: 30_000
}State Structure
The worker maintains the following state:
flow_module- The flow module being processedflow_slug- String slug for the queue nameworker_id- UUID for this worker (string format)repo- Ecto repo moduletask_supervisor- PID of Task.Supervisor for async executionactive_tasks- Map of task_ref => task_metadata (includes timeout_timer_ref)max_concurrency- Max parallel tasks (default: 10)batch_size- Messages per poll (default: 10)visibility_timeout- Seconds for message visibility (derived from flow's opt_timeout)signal_strategy-:pollingor:notifysignal_state- Adaptive backoff state for:pollingstrategylifecycle- Worker lifecycle state machine (seePgFlow.Worker.Lifecycle)
Lifecycle
- Initialization - Worker registers itself in the database, starts polling loop
- Polling - Continuously polls the queue, dispatches tasks to Task.Supervisor
- Task Execution - Tasks run concurrently, worker tracks completion/failure
- Graceful Shutdown - Worker stops accepting tasks, waits for active tasks
- Cleanup - Marks worker as stopped in database
Telemetry Events
The worker emits the following telemetry events:
[:pgflow, :worker, :start]- Worker started[:pgflow, :worker, :stop]- Worker stopped[:pgflow, :worker, :poll, :start]- Poll cycle started[:pgflow, :worker, :poll, :stop]- Poll cycle completed[:pgflow, :worker, :task, :start]- Task execution started[:pgflow, :worker, :task, :stop]- Task execution completed[:pgflow, :worker, :task, :exception]- Task execution failed
Summary
Functions
Returns a specification to start this module under a supervisor.
Returns the current state of the worker for debugging.
Starts a worker GenServer.
Gracefully stops the worker.
Types
@type signal_state() :: %{ current_interval: pos_integer(), min_interval: pos_integer(), max_interval: pos_integer(), poll_timer_ref: reference() | nil }
@type state() :: %{ flow_module: module(), flow_slug: String.t(), worker_id: String.t(), worker_name: String.t(), repo: module(), task_supervisor: pid(), active_tasks: %{required(reference()) => task_metadata()}, max_concurrency: pos_integer(), batch_size: pos_integer(), visibility_timeout: pos_integer(), signal_strategy: :polling | :notify, signal_state: signal_state(), notify_fallback_interval: pos_integer(), fallback_timer_ref: reference() | nil, flow_def: term(), lifecycle: PgFlow.Worker.Lifecycle.t() }
@type task_metadata() :: %{ run_id: String.t(), step_slug: String.t(), task_index: non_neg_integer(), msg_id: pos_integer(), timeout_timer_ref: reference() | nil, task_pid: pid() | nil }
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
Returns the current state of the worker for debugging.
@spec start_link(map()) :: GenServer.on_start()
Starts a worker GenServer.
Options
:flow_module- (required) The flow module to process:repo- (required) The Ecto repository module:worker_id- (optional) UUID string for worker identification (generated if not provided):task_supervisor- (optional) PID of Task.Supervisor (uses PgFlow.TaskSupervisor if not provided):max_concurrency- (optional) Maximum concurrent tasks (default: 10):batch_size- (optional) Messages to fetch per poll (default: 10):signal_strategy- (optional) Signal strategy,:pollingor:notify(default::polling):min_poll_interval- (optional) Minimum ms between polls (default: 1000):max_poll_interval- (optional) Maximum ms between polls (default: 5000):notify_fallback_interval- (optional) Fallback poll interval for:notifystrategy (default: 30000)
@spec stop(pid()) :: :ok
Gracefully stops the worker.
The worker will stop accepting new tasks and wait for active tasks to complete.