Oban.Notifier behaviour (Oban v2.10.1) View Source
The Notifier
coordinates listening for and publishing notifications for events in predefined
channels.
Every Oban supervision tree contains a notifier process, registered as Oban.Notifier
, which
can be any implementation of the Oban.Notifier
behaviour. The default one is
Oban.PostgresNotifier
, which relies on Postgres LISTEN/NOTIFY
. All incoming notifications
are relayed through the notifier to other processes.
Channels
The notifier recognizes three predefined channels, each with a distinct responsibility:
gossip
— arbitrary communication between nodes or jobs are sent on thegossip
channelinsert
— as jobs are inserted into the database an event is published on theinsert
channel. Processes such as queue producers use this as a signal to dispatch new jobs.signal
— instructions to take action, such as scale a queue or kill a running job, are sent through thesignal
channel.
The insert
and signal
channels are primarily for internal use. Use the gossip
channel to
send notifications between jobs or processes in your application.
Examples
Broadcasting after a job is completed:
defmodule MyApp.Worker do
use Oban.Worker
@impl Oban.Worker
def perform(job) do
:ok = MyApp.do_work(job.args)
Oban.Notifier.notify(Oban, :gossip, %{complete: job.id})
:ok
end
end
Listening for job complete events from another process:
def insert_and_listen(args) do
:ok = Oban.Notifier.listen([:gosip])
{:ok, job} =
args
|> MyApp.Worker.new()
|> Oban.insert()
receive do
{:notification, :gossip, %{"complete" => ^job.id}} ->
IO.puts("Other job complete!")
after
30_000 ->
IO.puts("Other job didn't finish in 30 seconds!")
end
end
Link to this section Summary
Callbacks
Register current process to receive messages from some channels
Broadcast a notification in a channel
Starts a notifier
Unregister current process from channels
Functions
Register the current process to receive relayed messages for the provided channels.
Broadcast a notification to listeners on all nodes.
Unregister the current process from receiving relayed messages on provided channels.
Link to this section Types
Specs
channel() :: :gossip | :insert | :signal
Specs
option() :: {:name, module()} | {:conf, Oban.Config.t()}
Specs
server() :: GenServer.server()
Link to this section Callbacks
Specs
Register current process to receive messages from some channels
Specs
Broadcast a notification in a channel
Specs
start_link([option()]) :: GenServer.on_start()
Starts a notifier
Specs
Unregister current process from channels
Link to this section Functions
Specs
Register the current process to receive relayed messages for the provided channels.
All messages are received as JSON
and decoded before they are relayed to registered
processes. Each registered process receives a three element notification tuple in the following
format:
{:notification, channel :: channel(), decoded :: map()}
Example
Register to listen for all :gossip
channel messages:
Oban.Notifier.listen([:gossip])
Listen for messages on all channels:
Oban.Notifier.listen([:gossip, :insert, :signal])
Listen for messages when using a custom Oban name:
Oban.Notifier.listen(MyApp.MyOban, [:gossip, :insert, :signal])
Specs
notify( Oban.Config.t() | server(), channel :: channel(), payload :: map() | [map()] ) :: :ok
Broadcast a notification to listeners on all nodes.
Notifications are scoped to the configured prefix
. For example, if there are instances running
with the public
and private
prefixes, a notification published in the public
prefix won't
be picked up by processes listening with the private
prefix.
Using notify/3 with a config is soft deprecated. Use a server as the first argument instead
Example
Broadcast a gossip message:
Oban.Notifier.notify(:gossip, %{message: "hi!"})
Specs
Unregister the current process from receiving relayed messages on provided channels.
Example
Stop listening for messages on the :gossip
channel:
Oban.Notifier.unlisten([:gossip])
Stop listening for messages when using a custom Oban name:
Oban.Notifier.unlisten(MyApp.MyOban, [:gossip])