PgFlow.Worker.Server (PgFlow v0.1.0)

Copy Markdown View Source

GenServer that polls pgmq and executes flow tasks.

Each worker is responsible for a single flow/queue and can execute multiple tasks concurrently up to the configured limit.

Implements the two-phase protocol:

  1. pgmq.read() - Reserve messages from pgmq (non-blocking)
  2. start_tasks() - Create step_tasks records and get task details

Signal Strategies

The worker supports two signal strategies for detecting new messages:

  • :polling - Adaptive jittered exponential backoff (1s → 5s). Polls fast when busy, backs off when idle.
  • :notify - LISTEN/NOTIFY via pgmq's enable_notify_insert. Near-instant wake-ups with a 30s fallback poll.

Configuration

Workers are configured via the config parameter passed to start_link/1:

config = %{
  flow_module: MyApp.Flows.ProcessOrder,
  repo: MyApp.Repo,
  worker_id: "550e8400-e29b-41d4-a716-446655440000",
  max_concurrency: 10,
  batch_size: 10,
  signal_strategy: :polling,
  min_poll_interval: 1_000,
  max_poll_interval: 5_000,
  notify_fallback_interval: 30_000
}

State Structure

The worker maintains the following state:

  • flow_module - The flow module being processed
  • flow_slug - String slug for the queue name
  • worker_id - UUID for this worker (string format)
  • repo - Ecto repo module
  • task_supervisor - PID of Task.Supervisor for async execution
  • active_tasks - Map of task_ref => task_metadata (includes timeout_timer_ref)
  • max_concurrency - Max parallel tasks (default: 10)
  • batch_size - Messages per poll (default: 10)
  • visibility_timeout - Seconds for message visibility (derived from flow's opt_timeout)
  • signal_strategy - :polling or :notify
  • signal_state - Adaptive backoff state for :polling strategy
  • lifecycle - Worker lifecycle state machine (see PgFlow.Worker.Lifecycle)

Lifecycle

  1. Initialization - Worker registers itself in the database, starts polling loop
  2. Polling - Continuously polls the queue, dispatches tasks to Task.Supervisor
  3. Task Execution - Tasks run concurrently, worker tracks completion/failure
  4. Graceful Shutdown - Worker stops accepting tasks, waits for active tasks
  5. Cleanup - Marks worker as stopped in database

Telemetry Events

The worker emits the following telemetry events:

  • [:pgflow, :worker, :start] - Worker started
  • [:pgflow, :worker, :stop] - Worker stopped
  • [:pgflow, :worker, :poll, :start] - Poll cycle started
  • [:pgflow, :worker, :poll, :stop] - Poll cycle completed
  • [:pgflow, :worker, :task, :start] - Task execution started
  • [:pgflow, :worker, :task, :stop] - Task execution completed
  • [:pgflow, :worker, :task, :exception] - Task execution failed

Summary

Functions

Returns a specification to start this module under a supervisor.

Returns the current state of the worker for debugging.

Starts a worker GenServer.

Gracefully stops the worker.

Types

signal_state()

@type signal_state() :: %{
  current_interval: pos_integer(),
  min_interval: pos_integer(),
  max_interval: pos_integer(),
  poll_timer_ref: reference() | nil
}

state()

@type state() :: %{
  flow_module: module(),
  flow_slug: String.t(),
  worker_id: String.t(),
  worker_name: String.t(),
  repo: module(),
  task_supervisor: pid(),
  active_tasks: %{required(reference()) => task_metadata()},
  max_concurrency: pos_integer(),
  batch_size: pos_integer(),
  visibility_timeout: pos_integer(),
  signal_strategy: :polling | :notify,
  signal_state: signal_state(),
  notify_fallback_interval: pos_integer(),
  fallback_timer_ref: reference() | nil,
  flow_def: term(),
  lifecycle: PgFlow.Worker.Lifecycle.t()
}

task_metadata()

@type task_metadata() :: %{
  run_id: String.t(),
  step_slug: String.t(),
  task_index: non_neg_integer(),
  msg_id: pos_integer(),
  timeout_timer_ref: reference() | nil,
  task_pid: pid() | nil
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_state(pid)

@spec get_state(pid()) :: map()

Returns the current state of the worker for debugging.

start_link(config)

@spec start_link(map()) :: GenServer.on_start()

Starts a worker GenServer.

Options

  • :flow_module - (required) The flow module to process
  • :repo - (required) The Ecto repository module
  • :worker_id - (optional) UUID string for worker identification (generated if not provided)
  • :task_supervisor - (optional) PID of Task.Supervisor (uses PgFlow.TaskSupervisor if not provided)
  • :max_concurrency - (optional) Maximum concurrent tasks (default: 10)
  • :batch_size - (optional) Messages to fetch per poll (default: 10)
  • :signal_strategy - (optional) Signal strategy, :polling or :notify (default: :polling)
  • :min_poll_interval - (optional) Minimum ms between polls (default: 1000)
  • :max_poll_interval - (optional) Maximum ms between polls (default: 5000)
  • :notify_fallback_interval - (optional) Fallback poll interval for :notify strategy (default: 30000)

stop(pid)

@spec stop(pid()) :: :ok

Gracefully stops the worker.

The worker will stop accepting new tasks and wait for active tasks to complete.