PgFlow.Signal.Notify (PgFlow v0.1.0)

Copy Markdown View Source

Manages PostgreSQL LISTEN/NOTIFY for pgmq queue notifications.

Uses pgmq's built-in enable_notify_insert (pgmq 1.8.0+) to receive instant wake-up signals when messages are inserted into queue tables. Dispatches :poll_now messages to registered worker processes.

Built on Postgrex.Notifications which is the purpose-built solution for PostgreSQL LISTEN/NOTIFY. Notifications are delivered asynchronously via messages to the process that called listen/2.

Summary

Functions

Returns a specification to start this module under a supervisor.

Registers a worker for notifications on a queue.

Starts the Signal.Notify process.

Unregisters a worker from notifications.

Types

state()

@type state() :: %{
  repo: module(),
  notify_throttle_ms: non_neg_integer(),
  conn: pid() | nil,
  workers: %{required(String.t()) => worker_entry()},
  channels: %{required(String.t()) => String.t()}
}

worker_entry()

@type worker_entry() :: %{
  worker_pid: pid(),
  monitor_ref: reference(),
  listen_ref: reference() | nil
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

register_worker(server \\ __MODULE__, flow_slug, worker_pid)

@spec register_worker(GenServer.server(), String.t(), pid()) :: :ok | {:error, term()}

Registers a worker for notifications on a queue.

Starts listening on the queue's channel. Note that pgmq.enable_notify_insert must be called separately (done by PgFlow.Supervisor) before registration.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts the Signal.Notify process.

Options

  • :repo - (required) The Ecto repository module
  • :notify_throttle_ms - (optional) Throttle interval for pgmq notifications (default: 250)

unregister_worker(server \\ __MODULE__, flow_slug)

@spec unregister_worker(GenServer.server(), String.t()) :: :ok

Unregisters a worker from notifications.