# `PgFlow.Worker.Server`
[🔗](https://github.com/agoodway/pgflow/blob/v0.1.0/lib/pgflow/worker/server.ex#L1)

GenServer that polls pgmq and executes flow tasks.

Each worker is responsible for a single flow/queue and can execute
multiple tasks concurrently up to the configured limit.

Implements the two-phase protocol:
1. pgmq.read() - Reserve messages from pgmq (non-blocking)
2. start_tasks() - Create step_tasks records and get task details

## Signal Strategies

The worker supports two signal strategies for detecting new messages:

  * `:polling` - Adaptive jittered exponential backoff (1s → 5s).
    Polls fast when busy, backs off when idle.
  * `:notify` - LISTEN/NOTIFY via pgmq's `enable_notify_insert`.
    Near-instant wake-ups with a 30s fallback poll.

## Configuration

Workers are configured via the `config` parameter passed to `start_link/1`:

    config = %{
      flow_module: MyApp.Flows.ProcessOrder,
      repo: MyApp.Repo,
      worker_id: "550e8400-e29b-41d4-a716-446655440000",
      max_concurrency: 10,
      batch_size: 10,
      signal_strategy: :polling,
      min_poll_interval: 1_000,
      max_poll_interval: 5_000,
      notify_fallback_interval: 30_000
    }

## State Structure

The worker maintains the following state:

  * `flow_module` - The flow module being processed
  * `flow_slug` - String slug for the queue name
  * `worker_id` - UUID for this worker (string format)
  * `repo` - Ecto repo module
  * `task_supervisor` - PID of Task.Supervisor for async execution
  * `active_tasks` - Map of task_ref => task_metadata (includes timeout_timer_ref)
  * `max_concurrency` - Max parallel tasks (default: 10)
  * `batch_size` - Messages per poll (default: 10)
  * `visibility_timeout` - Seconds for message visibility (derived from flow's opt_timeout)
  * `signal_strategy` - `:polling` or `:notify`
  * `signal_state` - Adaptive backoff state for `:polling` strategy
  * `lifecycle` - Worker lifecycle state machine (see `PgFlow.Worker.Lifecycle`)

## Lifecycle

1. **Initialization** - Worker registers itself in the database, starts polling loop
2. **Polling** - Continuously polls the queue, dispatches tasks to Task.Supervisor
3. **Task Execution** - Tasks run concurrently, worker tracks completion/failure
4. **Graceful Shutdown** - Worker stops accepting tasks, waits for active tasks
5. **Cleanup** - Marks worker as stopped in database

## Telemetry Events

The worker emits the following telemetry events:

  * `[:pgflow, :worker, :start]` - Worker started
  * `[:pgflow, :worker, :stop]` - Worker stopped
  * `[:pgflow, :worker, :poll, :start]` - Poll cycle started
  * `[:pgflow, :worker, :poll, :stop]` - Poll cycle completed
  * `[:pgflow, :worker, :task, :start]` - Task execution started
  * `[:pgflow, :worker, :task, :stop]` - Task execution completed
  * `[:pgflow, :worker, :task, :exception]` - Task execution failed

# `signal_state`

```elixir
@type signal_state() :: %{
  current_interval: pos_integer(),
  min_interval: pos_integer(),
  max_interval: pos_integer(),
  poll_timer_ref: reference() | nil
}
```

# `state`

```elixir
@type state() :: %{
  flow_module: module(),
  flow_slug: String.t(),
  worker_id: String.t(),
  worker_name: String.t(),
  repo: module(),
  task_supervisor: pid(),
  active_tasks: %{required(reference()) =&gt; task_metadata()},
  max_concurrency: pos_integer(),
  batch_size: pos_integer(),
  visibility_timeout: pos_integer(),
  signal_strategy: :polling | :notify,
  signal_state: signal_state(),
  notify_fallback_interval: pos_integer(),
  fallback_timer_ref: reference() | nil,
  flow_def: term(),
  lifecycle: PgFlow.Worker.Lifecycle.t()
}
```

# `task_metadata`

```elixir
@type task_metadata() :: %{
  run_id: String.t(),
  step_slug: String.t(),
  task_index: non_neg_integer(),
  msg_id: pos_integer(),
  timeout_timer_ref: reference() | nil,
  task_pid: pid() | nil
}
```

# `child_spec`

Returns a specification to start this module under a supervisor.

See `Supervisor`.

# `get_state`

```elixir
@spec get_state(pid()) :: map()
```

Returns the current state of the worker for debugging.

# `start_link`

```elixir
@spec start_link(map()) :: GenServer.on_start()
```

Starts a worker GenServer.

## Options

  * `:flow_module` - (required) The flow module to process
  * `:repo` - (required) The Ecto repository module
  * `:worker_id` - (optional) UUID string for worker identification (generated if not provided)
  * `:task_supervisor` - (optional) PID of Task.Supervisor (uses PgFlow.TaskSupervisor if not provided)
  * `:max_concurrency` - (optional) Maximum concurrent tasks (default: 10)
  * `:batch_size` - (optional) Messages to fetch per poll (default: 10)
  * `:signal_strategy` - (optional) Signal strategy, `:polling` or `:notify` (default: `:polling`)
  * `:min_poll_interval` - (optional) Minimum ms between polls (default: 1000)
  * `:max_poll_interval` - (optional) Maximum ms between polls (default: 5000)
  * `:notify_fallback_interval` - (optional) Fallback poll interval for `:notify` strategy (default: 30000)

# `stop`

```elixir
@spec stop(pid()) :: :ok
```

Gracefully stops the worker.

The worker will stop accepting new tasks and wait for active tasks to complete.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
