View Source Oban.Notifiers.Postgres (Oban v2.16.1)

A Postgres Listen/Notify based Notifier.

Usage

Specify the Postgres notifier in your Oban configuration:

config :my_app, Oban,
  notifier: Oban.Notifiers.Postgres

Transactions and Testing

The notifications system is built on PostgreSQL's LISTEN/NOTIFY functionality. Notifications are only delivered after a transaction completes and are de-duplicated before publishing. Typically, applications run Ecto in sandbox mode while testing, but sandbox mode wraps each test in a separate transaction that's rolled back after the test completes. That means the transaction is never committed, which prevents delivering any notifications.

To test using notifications you must run Ecto without sandbox mode enabled, or use Oban.Notifiers.PG instead.

Triggers and Scaling

A database trigger is used to dispatch notifications to relevant queues as jobs are inserted into the database. The trigger uses pg_notify to send a notification for each distinct queue, and pg_notify relies on a de-duplication mechanism with O(N^2) complexity. At high load, e.g. thousands or more job inserts per second, notification de-duplication may become a bottleneck.

The trigger mechanism is designed to make jobs execute immediately after insert, rather than up to 1 second afterwards, and it can safely be disabled to improve insert throughput. Use the following migration to drop the trigger:

defmodule MyApp.Repo.Migrations.DropObanJobsNotifyTrigger do
  use Ecto.Migration

  def change do
    execute(
      "DROP TRIGGER IF EXISTS oban_notify ON public.oban_jobs",
      "CREATE TRIGGER oban_notify AFTER INSERT ON public.oban_jobs FOR EACH ROW EXECUTE PROCEDURE public.oban_jobs_notify()"
    )
  end
end

Summary

Functions

Register current process to receive messages from some channels

Start the notifier.

Unregister current process from channels

Functions

Link to this function

listen(server, channels)

View Source
@spec listen(GenServer.server(), channels :: [Oban.Notifier.channel()]) :: :ok

Register current process to receive messages from some channels

@spec start_link(Keyword.t()) :: GenServer.on_start()

Start the notifier.

Link to this function

unlisten(server, channels)

View Source
@spec unlisten(GenServer.server(), channels :: [Oban.Notifier.channel()]) :: :ok

Unregister current process from channels