Oban.Notifier behaviour (Oban v2.10.1) View Source

The Notifier coordinates listening for and publishing notifications for events in predefined channels.

Every Oban supervision tree contains a notifier process, registered as Oban.Notifier, which can be any implementation of the Oban.Notifier behaviour. The default one is Oban.PostgresNotifier, which relies on Postgres LISTEN/NOTIFY. All incoming notifications are relayed through the notifier to other processes.

Channels

The notifier recognizes three predefined channels, each with a distinct responsibility:

  • gossip — arbitrary communication between nodes or jobs are sent on the gossip channel
  • insert — as jobs are inserted into the database an event is published on the insert channel. Processes such as queue producers use this as a signal to dispatch new jobs.
  • signal — instructions to take action, such as scale a queue or kill a running job, are sent through the signal channel.

The insert and signal channels are primarily for internal use. Use the gossip channel to send notifications between jobs or processes in your application.

Examples

Broadcasting after a job is completed:

defmodule MyApp.Worker do
  use Oban.Worker

  @impl Oban.Worker
  def perform(job) do
    :ok = MyApp.do_work(job.args)

    Oban.Notifier.notify(Oban, :gossip, %{complete: job.id})

    :ok
  end
end

Listening for job complete events from another process:

def insert_and_listen(args) do
  :ok = Oban.Notifier.listen([:gosip])

  {:ok, job} =
    args
    |> MyApp.Worker.new()
    |> Oban.insert()

  receive do
    {:notification, :gossip, %{"complete" => ^job.id}} ->
      IO.puts("Other job complete!")
  after
    30_000 ->
      IO.puts("Other job didn't finish in 30 seconds!")
  end
end

Link to this section Summary

Callbacks

Register current process to receive messages from some channels

Broadcast a notification in a channel

Starts a notifier

Unregister current process from channels

Functions

Register the current process to receive relayed messages for the provided channels.

Broadcast a notification to listeners on all nodes.

Unregister the current process from receiving relayed messages on provided channels.

Link to this section Types

Specs

channel() :: :gossip | :insert | :signal

Specs

option() :: {:name, module()} | {:conf, Oban.Config.t()}

Specs

server() :: GenServer.server()

Link to this section Callbacks

Link to this callback

listen(server, channels)

View Source

Specs

listen(server(), channels :: [channel()]) :: :ok

Register current process to receive messages from some channels

Link to this callback

notify(server, channel, payload)

View Source

Specs

notify(server(), channel :: channel(), payload :: [map()]) :: :ok

Broadcast a notification in a channel

Specs

start_link([option()]) :: GenServer.on_start()

Starts a notifier

Link to this callback

unlisten(server, channels)

View Source

Specs

unlisten(server(), channels :: [channel()]) :: :ok

Unregister current process from channels

Link to this section Functions

Link to this function

listen(server \\ Oban, channels)

View Source

Specs

listen(server(), [channel()]) :: :ok

Register the current process to receive relayed messages for the provided channels.

All messages are received as JSON and decoded before they are relayed to registered processes. Each registered process receives a three element notification tuple in the following format:

{:notification, channel :: channel(), decoded :: map()}

Example

Register to listen for all :gossip channel messages:

Oban.Notifier.listen([:gossip])

Listen for messages on all channels:

Oban.Notifier.listen([:gossip, :insert, :signal])

Listen for messages when using a custom Oban name:

Oban.Notifier.listen(MyApp.MyOban, [:gossip, :insert, :signal])
Link to this function

notify(conf_or_server \\ Oban, channel, payload)

View Source

Specs

notify(
  Oban.Config.t() | server(),
  channel :: channel(),
  payload :: map() | [map()]
) :: :ok

Broadcast a notification to listeners on all nodes.

Notifications are scoped to the configured prefix. For example, if there are instances running with the public and private prefixes, a notification published in the public prefix won't be picked up by processes listening with the private prefix.

Using notify/3 with a config is soft deprecated. Use a server as the first argument instead

Example

Broadcast a gossip message:

Oban.Notifier.notify(:gossip, %{message: "hi!"})
Link to this function

unlisten(server \\ Oban, channels)

View Source

Specs

unlisten(server(), [channel()]) :: :ok

Unregister the current process from receiving relayed messages on provided channels.

Example

Stop listening for messages on the :gossip channel:

Oban.Notifier.unlisten([:gossip])

Stop listening for messages when using a custom Oban name:

Oban.Notifier.unlisten(MyApp.MyOban, [:gossip])