Oban.Notifier (Oban v2.4.0) View Source
The Notifier
coordinates listening for and publishing notifications for events in predefined
channels.
Every Oban supervision tree contains a notifier process, registered as Oban.Notifier
, which
itself maintains a single connection with an app's database. All incoming notifications are
relayed through that connection to other processes.
Channels
The notifier recognizes three predefined channels, each with a distinct responsibility:
gossip
— arbitrary communication between nodes or jobs are sent on thegossip
channelinsert
— as jobs are inserted into the database an event is published on theinsert
channel. Processes such as queue producers use this as a signal to dispatch new jobs.signal
— instructions to take action, such as scale a queue or kill a running job, are sent through thesignal
channel.
The insert
and signal
channels are primarily for internal use. Use the gossip
channel to
send notifications between jobs or processes in your application.
Caveats
The notifications system is built on PostgreSQL's LISTEN/NOTIFY
functionality. Notifications
are only delivered after a transaction completes and are de-duplicated before publishing.
Most applications run Ecto in sandbox mode while testing. Sandbox mode wraps each test in a separate transaction which is rolled back after the test completes. That means the transaction is never committed, which prevents delivering any notifications.
To test using notifications you must run Ecto without sandbox mode enabled.
Examples
Broadcasting after a job is completed:
defmodule MyApp.Worker do
use Oban.Worker
@impl Oban.Worker
def perform(job) do
:ok = MyApp.do_work(job.args)
Oban.Notifier.notify(Oban.config(), :gossip, %{complete: job.id})
:ok
end
end
Listening for job complete events from another process:
def insert_and_listen(args) do
{:ok, job} =
args
|> MyApp.Worker.new()
|> Oban.insert()
receive do
{:notification, :gossip, %{"complete" => ^job.id}} ->
IO.puts("Other job complete!")
after
30_000 ->
IO.puts("Other job didn't finish in 30 seconds!")
end
end
Link to this section Summary
Functions
Returns a specification to start this module under a supervisor.
Register the current process to receive relayed messages for the provided channels.
Broadcast a notification to listeners on all nodes.
Link to this section Types
Specs
channel() :: :gossip | :insert | :signal
Specs
option() :: {:name, module()} | {:conf, Oban.Config.t()}
Link to this section Functions
Returns a specification to start this module under a supervisor.
See Supervisor
.
Specs
listen(GenServer.server(), channels :: [channel()]) :: :ok
Register the current process to receive relayed messages for the provided channels.
All messages are received as JSON
and decoded before they are relayed to registered
processes. Each registered process receives a three element notification tuple in the following
format:
{:notification, channel :: channel(), decoded :: map()}
Example
Register to listen for all :gossip
channel messages:
Oban.Notifier.listen([:gossip])
Listen for messages on all channels:
Oban.Notifier.listen([:gossip, :insert, :signal])
Listen for messages when using a custom Oban name:
Oban.Notifier.listen(MyApp.Oban, [:gossip, :insert, :signal])
Specs
notify(Oban.Config.t(), channel :: channel(), payload :: map()) :: :ok
Broadcast a notification to listeners on all nodes.
Notifications are scoped to the configured prefix
. For example, if there are instances running
with the public
and private
prefixes, a notification published in the public
prefix won't
be picked up by processes listening with the private
prefix.
Example
Broadcast a gossip message:
Oban.Notifier.notify(Oban.config(), :gossip, %{message: "hi!"})