View Source Oban.Notifier behaviour (Oban v2.17.10)

The Notifier coordinates listening for and publishing notifications for events in predefined channels.

Oban functions such as pause_queue, scale_queue, and cancel_job all require a connected notifier to operate. Use status/1 to check the notifier's connectivity status and diagnose issues.

Notifiers

Every Oban supervision tree contains a notifier process, registered as Oban.Notifier, which is an implementation of the Oban.Notifier behaviour.

  • Oban.Notifiers.Postgres — A Postgres notifier that uses LISTEN/NOTIFY to broadcast messages. This is the default.

  • Oban.Notifiers.PG — A process groups notifier that relies on Distributed Erlang to broadcast messages.

  • Oban.Notifiers.Phoenix — A notifier that uses Phoenix.PubSub to broadcast messages. In addition to centralizing PubSub communications, it opens up the possible transports to all PubSub adapters.

Channels

All incoming notifications are relayed through the notifier to any processes listening on a given channel. Internally, Oban uses a variety of predefined channels with distinct responsibilities:

  • insert — as jobs are inserted an event is published on the insert channel. Processes such as queue producers use this as a signal to dispatch new jobs.

  • leader — messages regarding node leadership exchanged between peers

  • signal — instructions to take action, such as scale a queue or kill a running job, are sent through the signal channel

  • sonar — periodic notification checks to monitor pubsub health and determine connectivity

Examples

Broadcasting after a job is completed:

defmodule MyApp.Worker do
  use Oban.Worker

  @impl Oban.Worker
  def perform(job) do
    :ok = MyApp.do_work(job.args)

    Oban.Notifier.notify(Oban, :my_app_jobs, %{complete: job.id})

    :ok
  end
end

Listening for job complete events from another process:

def insert_and_listen(args) do
  :ok = Oban.Notifier.listen([:my_app_jobs])

  {:ok, %{id: job_id} = job} =
    args
    |> MyApp.Worker.new()
    |> Oban.insert()

  receive do
    {:notification, :my_app_jobs, %{"complete" => ^job_id}} ->
      IO.puts("Other job complete!")
  after
    30_000 ->
      IO.puts("Other job didn't finish in 30 seconds!")
  end
end

Summary

Callbacks

Register the current process to receive messages from one or more channels.

Broadcast a notification to all subscribers of a channel.

Starts a notifier instance.

Unregister current process from channels.

Functions

Register the current process to receive relayed messages for the provided channels.

Broadcast a notification to listeners on all nodes.

Check a notifier's connectivity level to see whether it's able to publish or receive messages from other nodes.

Unregister the current process from receiving relayed messages on provided channels.

Types

@type channel() :: atom()
@type name_or_conf() :: Oban.name() | Oban.Config.t()
@type payload() :: map() | [map()]
@type pubsub_status() :: :unknown | :isolated | :solitary | :clustered

Callbacks

Link to this callback

listen(name_or_conf, channels)

View Source
@callback listen(name_or_conf(), channels :: channel() | [channel()]) :: :ok

Register the current process to receive messages from one or more channels.

Link to this callback

notify(name_or_conf, channel, payload)

View Source
@callback notify(name_or_conf(), channel(), payload()) :: :ok | {:error, Exception.t()}

Broadcast a notification to all subscribers of a channel.

@callback start_link(opts :: [conf: Oban.Config.t(), name: GenServer.name()]) ::
  GenServer.on_start()

Starts a notifier instance.

Link to this callback

unlisten(name_or_conf, channels)

View Source
@callback unlisten(name_or_conf(), channels :: channel() | [channel()]) :: :ok

Unregister current process from channels.

Functions

Link to this function

listen(name_or_conf \\ Oban, channels)

View Source
@spec listen(name_or_conf(), channel() | [channel()]) :: :ok

Register the current process to receive relayed messages for the provided channels.

All messages are received as JSON and decoded before they are relayed to registered processes. Each registered process receives a three element notification tuple in the following format:

{:notification, channel :: channel(), decoded :: map()}

Example

Register to listen for all :gossip channel messages:

Oban.Notifier.listen(:gossip)

Listen for messages on multiple channels:

Oban.Notifier.listen([:gossip, :insert, :leader, :signal])

Listen for messages when using a custom Oban name:

Oban.Notifier.listen(MyApp.MyOban, [:gossip, :signal])
Link to this function

notify(name_or_conf \\ Oban, channel, payload)

View Source
@spec notify(name_or_conf(), channel(), payload()) :: :ok | {:error, Exception.t()}

Broadcast a notification to listeners on all nodes.

Notifications are scoped to the configured prefix. For example, if there are instances running with the public and private prefixes, a notification published in the public prefix won't be picked up by processes listening with the private prefix.

Example

Broadcast a gossip message:

Oban.Notifier.notify(:my_channel, %{message: "hi!"})

Broadcast multiple messages at once:

Oban.Notifier.notify(:my_channel, [%{message: "hi!"}, %{message: "there"}])

Broadcast using a custom instance name:

Oban.Notifier.notify(MyOban, :my_channel, %{message: "hi!"})
Link to this function

status(name_or_conf \\ Oban)

View Source
@spec status(name_or_conf()) :: pubsub_status()

Check a notifier's connectivity level to see whether it's able to publish or receive messages from other nodes.

Oban functions such as pause_queue, scale_queue, and cancel_job all require a connected notifier to operate. Each Oban instance runs a persistent process to monitor connectivity, which is exposed by this function.

Statuses

  • :unknown — This is the default state on start before the notifier has time to determine the appropriate status.

  • :isolated — The notifier isn't receiving any messages.

    The notifier may be connected to a database but :isolated and unable to receive other message and unable to receive outside messages. Typically, this is the case for the default Postgres notifier while testing or behind a connection pooler.

  • :solitary — The notifier is only receiving messages from itself. This may be the case for the PG notifier when Distributed Erlang nodes aren't connected, in development, or in production deployments that only run a single node. If you're running multiple nodes in production and the status is :solitary, there's a connectivity issue.

  • :clustered — The notifier is connected and able to receive messages from other nodes. The Postgres notifier is considered clustered if it can receive notifications, while the PG notifier requires a functional Distributed Erlang cluster.

Examples

Check the notifier's pubsub status:

Oban.Notifier.status()

Check the status for a custom instance:

Oban.Notifier.status(MyOban)
Link to this function

unlisten(name_or_conf \\ Oban, channels)

View Source
@spec unlisten(name_or_conf(), channel() | [channel()]) :: :ok

Unregister the current process from receiving relayed messages on provided channels.

Example

Stop listening for messages on the :gossip channel:

Oban.Notifier.unlisten(:gossip)

Stop listening for messages on multiple channels:

Oban.Notifier.unlisten([:insert, :gossip])

Stop listening for messages when using a custom Oban name:

Oban.Notifier.unlisten(MyApp.MyOban, [:gossip])