View Source Oban.Notifier behaviour (Oban v2.15.3)

The Notifier coordinates listening for and publishing notifications for events in predefined channels.

Every Oban supervision tree contains a notifier process, registered as Oban.Notifier, which must be an implementation of the Oban.Notifier behaviour. The default implementation is uses the LISTEN/NOTIFY operations built into Postgres.

All incoming notifications are relayed through the notifier to other processes.

Channels

Internally, Oban uses a variety of predefined channels with distinct responsibilities:

  • insert — as jobs are inserted into the database an event is published on the insert channel. Processes such as queue producers use this as a signal to dispatch new jobs.

  • leader — messages regarding node leadership exchanged between peers

  • signal — instructions to take action, such as scale a queue or kill a running job, are sent through the signal channel

  • gossip — arbitrary communication for coordination between nodes

  • stager — messages regarding job staging, e.g. notifying queues that jobs are ready for execution

Examples

Broadcasting after a job is completed:

defmodule MyApp.Worker do
  use Oban.Worker

  @impl Oban.Worker
  def perform(job) do
    :ok = MyApp.do_work(job.args)

    Oban.Notifier.notify(Oban, :my_app_jobs, %{complete: job.id})

    :ok
  end
end

Listening for job complete events from another process:

def insert_and_listen(args) do
  :ok = Oban.Notifier.listen([:my_app_jobs])

  {:ok, %{id: job_id} = job} =
    args
    |> MyApp.Worker.new()
    |> Oban.insert()

  receive do
    {:notification, :my_app_jobs, %{"complete" => ^job_id}} ->
      IO.puts("Other job complete!")
  after
    30_000 ->
      IO.puts("Other job didn't finish in 30 seconds!")
  end
end

Summary

Callbacks

Register the current process to receive messages from one or more channels.

Broadcast a notification to all subscribers of a channel.

Starts a notifier instance.

Unregister current process from channels.

Functions

Register the current process to receive relayed messages for the provided channels.

Broadcast a notification to listeners on all nodes.

Unregister the current process from receiving relayed messages on provided channels.

Types

@type channel() :: atom()
@type option() :: {:name, module()} | {:conf, Oban.Config.t()}
@type server() :: GenServer.server()

Callbacks

Link to this callback

listen(server, channels)

View Source
@callback listen(server(), channels :: [channel()]) :: :ok

Register the current process to receive messages from one or more channels.

Link to this callback

notify(server, channel, payload)

View Source
@callback notify(server(), channel :: channel(), payload :: [map()]) :: :ok

Broadcast a notification to all subscribers of a channel.

@callback start_link([option()]) :: GenServer.on_start()

Starts a notifier instance.

Link to this callback

unlisten(server, channels)

View Source
@callback unlisten(server(), channels :: [channel()]) :: :ok

Unregister current process from channels.

Functions

Link to this function

listen(name \\ Oban, channels)

View Source
@spec listen(server(), channel() | [channel()]) :: :ok

Register the current process to receive relayed messages for the provided channels.

All messages are received as JSON and decoded before they are relayed to registered processes. Each registered process receives a three element notification tuple in the following format:

{:notification, channel :: channel(), decoded :: map()}

Example

Register to listen for all :gossip channel messages:

Oban.Notifier.listen(:gossip)

Listen for messages on multiple channels:

Oban.Notifier.listen([:gossip, :insert, :leader, :signal, :stager])

Listen for messages when using a custom Oban name:

Oban.Notifier.listen(MyApp.MyOban, [:gossip, :signal])
Link to this function

notify(conf_or_name \\ Oban, channel, payload)

View Source
@spec notify(
  Oban.Config.t() | server(),
  channel :: channel(),
  payload :: map() | [map()]
) :: :ok

Broadcast a notification to listeners on all nodes.

Notifications are scoped to the configured prefix. For example, if there are instances running with the public and private prefixes, a notification published in the public prefix won't be picked up by processes listening with the private prefix.

Using notify/3 with a config is soft deprecated. Use a server as the first argument instead

Example

Broadcast a gossip message:

Oban.Notifier.notify(:gossip, %{message: "hi!"})
Link to this function

unlisten(name \\ Oban, channels)

View Source
@spec unlisten(server(), [channel()]) :: :ok

Unregister the current process from receiving relayed messages on provided channels.

Example

Stop listening for messages on the :gossip channel:

Oban.Notifier.unlisten([:gossip])

Stop listening for messages when using a custom Oban name:

Oban.Notifier.unlisten(MyApp.MyOban, [:gossip])