EctoPGMQ.Notifications (ecto_pgmq v1.0.0)

Copy Markdown View Source

The entrypoint for managing PGMQ notification subscriptions.

For general information about notification subscriptions, see Postgrex.Notifications.

When to Use Notifications

In general, notifications are most useful for queues with sporadic message bursts. If a queue is expected to be empty for long periods of time, subscribing to notifications can reduce polling overhead for consumers. If a queue has a fairly steady flow of messages, then notifications may not be as useful.

Enabling Notifications

In order to receive insert notifications for a queue, they must be explicitly enabled. This can be done during queue creation with EctoPGMQ.create_queue/4. Alternatively, notifications can be enabled for an existing queue with EctoPGMQ.update_queue/4 or EctoPGMQ.PGMQ.enable_notify_insert/4.

Throttling

PGMQ supports per-queue notification throttling with millisecond granularity (see EctoPGMQ.PGMQ.throttle_interval/0) in order to avoid flooding notification subscribers during periods of high insert volume.

For more information about configuring notification throttling, see EctoPGMQ.update_queue/4.

For more information about per-queue notification throttling metrics, see EctoPGMQ.Throttle.

Disabling Notifications

Notifications can be disabled for an existing queue with EctoPGMQ.update_queue/4 or EctoPGMQ.PGMQ.disable_notify_insert/3

Summary

Types

A notification channel name.

A listener process.

A subscription reference.

Functions

Starts a PGMQ notification listener linked to the current process.

Subscribes the current process to notifications for the given queue.

Unsubscribes the current process from notifications associated with the given listener reference.

Types

channel()

@type channel() :: String.t()

A notification channel name.

listener()

@type listener() :: GenServer.server()

A listener process.

subscription()

@type subscription() :: reference()

A subscription reference.

Functions

start_link(opts)

@spec start_link(keyword()) :: {:ok, pid()} | {:error, Postgrex.Error.t() | term()}

Starts a PGMQ notification listener linked to the current process.

Warning

Each notification listener uses its own Postgres connection outside of any Ecto.Repo connection pools. Therefore, in most cases, it's preferable to start a single listener that subscribes to multiple channels instead of starting a single listener per channel.

For information about supported options, see Postgrex.Notifications.start_link/1.

Examples

start_link([name: MyApp.Notifications | Repo.config()])

subscribe(listener, queue, opts \\ [])

@spec subscribe(listener(), EctoPGMQ.Queue.name(), keyword()) ::
  {:ok | :eventually, subscription(), channel()}

Subscribes the current process to notifications for the given queue.

Notifications will manifest as messages with the following shape where listener_pid is the pid/0 of the listener/0:

{:notification, listener_pid, subscription, channel, ""}

For information about supported options, see Postgrex.Notifications.listen/3.

Examples

subscribe(MyApp.Notifications, "my_queue")

unsubscribe(listener, subscription, opts \\ [])

@spec unsubscribe(listener(), subscription(), keyword()) :: :ok | :error

Unsubscribes the current process from notifications associated with the given listener reference.

For information about supported options, see Postgrex.Notifications.unlisten/3.

Examples

unsubscribe(MyApp.Notifications, my_subscription)