View Source Oban.Notifier behaviour (Oban v2.15.2)
The Notifier
coordinates listening for and publishing notifications for events in predefined
channels.
Every Oban supervision tree contains a notifier process, registered as Oban.Notifier
, which
must be an implementation of the Oban.Notifier
behaviour. The default implementation is uses
the LISTEN/NOTIFY
operations built into Postgres.
All incoming notifications are relayed through the notifier to other processes.
Channels
Internally, Oban uses a variety of predefined channels with distinct responsibilities:
insert
— as jobs are inserted into the database an event is published on theinsert
channel. Processes such as queue producers use this as a signal to dispatch new jobs.leader
— messages regarding node leadership exchanged between peerssignal
— instructions to take action, such as scale a queue or kill a running job, are sent through thesignal
channelgossip
— arbitrary communication for coordination between nodesstager
— messages regarding job staging, e.g. notifying queues that jobs are ready for execution
Examples
Broadcasting after a job is completed:
defmodule MyApp.Worker do
use Oban.Worker
@impl Oban.Worker
def perform(job) do
:ok = MyApp.do_work(job.args)
Oban.Notifier.notify(Oban, :my_app_jobs, %{complete: job.id})
:ok
end
end
Listening for job complete events from another process:
def insert_and_listen(args) do
:ok = Oban.Notifier.listen([:my_app_jobs])
{:ok, %{id: job_id} = job} =
args
|> MyApp.Worker.new()
|> Oban.insert()
receive do
{:notification, :my_app_jobs, %{"complete" => ^job_id}} ->
IO.puts("Other job complete!")
after
30_000 ->
IO.puts("Other job didn't finish in 30 seconds!")
end
end
Summary
Callbacks
Register the current process to receive messages from one or more channels.
Broadcast a notification to all subscribers of a channel.
Starts a notifier instance.
Unregister current process from channels.
Functions
Register the current process to receive relayed messages for the provided channels.
Broadcast a notification to listeners on all nodes.
Unregister the current process from receiving relayed messages on provided channels.
Types
@type channel() :: atom()
@type option() :: {:name, module()} | {:conf, Oban.Config.t()}
@type server() :: GenServer.server()
Callbacks
Register the current process to receive messages from one or more channels.
Broadcast a notification to all subscribers of a channel.
@callback start_link([option()]) :: GenServer.on_start()
Starts a notifier instance.
Unregister current process from channels.
Functions
Register the current process to receive relayed messages for the provided channels.
All messages are received as JSON
and decoded before they are relayed to registered
processes. Each registered process receives a three element notification tuple in the following
format:
{:notification, channel :: channel(), decoded :: map()}
Example
Register to listen for all :gossip
channel messages:
Oban.Notifier.listen(:gossip)
Listen for messages on multiple channels:
Oban.Notifier.listen([:gossip, :insert, :leader, :signal, :stager])
Listen for messages when using a custom Oban name:
Oban.Notifier.listen(MyApp.MyOban, [:gossip, :signal])
@spec notify( Oban.Config.t() | server(), channel :: channel(), payload :: map() | [map()] ) :: :ok
Broadcast a notification to listeners on all nodes.
Notifications are scoped to the configured prefix
. For example, if there are instances running
with the public
and private
prefixes, a notification published in the public
prefix won't
be picked up by processes listening with the private
prefix.
Using notify/3 with a config is soft deprecated. Use a server as the first argument instead
Example
Broadcast a gossip message:
Oban.Notifier.notify(:gossip, %{message: "hi!"})
Unregister the current process from receiving relayed messages on provided channels.
Example
Stop listening for messages on the :gossip
channel:
Oban.Notifier.unlisten([:gossip])
Stop listening for messages when using a custom Oban name:
Oban.Notifier.unlisten(MyApp.MyOban, [:gossip])