View Source Oban.Notifiers.Postgres (Oban v2.16.1)
A Postgres Listen/Notify based Notifier.
Usage
Specify the Postgres
notifier in your Oban configuration:
config :my_app, Oban,
notifier: Oban.Notifiers.Postgres
Transactions and Testing
The notifications system is built on PostgreSQL's LISTEN/NOTIFY
functionality. Notifications
are only delivered after a transaction completes and are de-duplicated before publishing.
Typically, applications run Ecto in sandbox mode while testing, but sandbox mode wraps each test
in a separate transaction that's rolled back after the test completes. That means the
transaction is never committed, which prevents delivering any notifications.
To test using notifications you must run Ecto without sandbox mode enabled, or use
Oban.Notifiers.PG
instead.
Triggers and Scaling
A database trigger is used to dispatch notifications to relevant queues as jobs are inserted
into the database. The trigger uses pg_notify
to send a notification for each distinct
queue, and pg_notify
relies on a de-duplication mechanism with O(N^2)
complexity. At high
load, e.g. thousands or more job inserts per second, notification de-duplication may become a
bottleneck.
The trigger mechanism is designed to make jobs execute immediately after insert, rather than up to 1 second afterwards, and it can safely be disabled to improve insert throughput. Use the following migration to drop the trigger:
defmodule MyApp.Repo.Migrations.DropObanJobsNotifyTrigger do
use Ecto.Migration
def change do
execute(
"DROP TRIGGER IF EXISTS oban_notify ON public.oban_jobs",
"CREATE TRIGGER oban_notify AFTER INSERT ON public.oban_jobs FOR EACH ROW EXECUTE PROCEDURE public.oban_jobs_notify()"
)
end
end
Summary
Functions
Register current process to receive messages from some channels
Start the notifier.
Unregister current process from channels
Functions
@spec listen(GenServer.server(), channels :: [Oban.Notifier.channel()]) :: :ok
Register current process to receive messages from some channels
@spec start_link(Keyword.t()) :: GenServer.on_start()
Start the notifier.
@spec unlisten(GenServer.server(), channels :: [Oban.Notifier.channel()]) :: :ok
Unregister current process from channels