Oban.Notifier (Oban v2.4.0) View Source

The Notifier coordinates listening for and publishing notifications for events in predefined channels.

Every Oban supervision tree contains a notifier process, registered as Oban.Notifier, which itself maintains a single connection with an app's database. All incoming notifications are relayed through that connection to other processes.

Channels

The notifier recognizes three predefined channels, each with a distinct responsibility:

  • gossip — arbitrary communication between nodes or jobs are sent on the gossip channel
  • insert — as jobs are inserted into the database an event is published on the insert channel. Processes such as queue producers use this as a signal to dispatch new jobs.
  • signal — instructions to take action, such as scale a queue or kill a running job, are sent through the signal channel.

The insert and signal channels are primarily for internal use. Use the gossip channel to send notifications between jobs or processes in your application.

Caveats

The notifications system is built on PostgreSQL's LISTEN/NOTIFY functionality. Notifications are only delivered after a transaction completes and are de-duplicated before publishing.

Most applications run Ecto in sandbox mode while testing. Sandbox mode wraps each test in a separate transaction which is rolled back after the test completes. That means the transaction is never committed, which prevents delivering any notifications.

To test using notifications you must run Ecto without sandbox mode enabled.

Examples

Broadcasting after a job is completed:

defmodule MyApp.Worker do
  use Oban.Worker

  @impl Oban.Worker
  def perform(job) do
    :ok = MyApp.do_work(job.args)

    Oban.Notifier.notify(Oban.config(), :gossip, %{complete: job.id})

    :ok
  end
end

Listening for job complete events from another process:

def insert_and_listen(args) do
  {:ok, job} =
    args
    |> MyApp.Worker.new()
    |> Oban.insert()

  receive do
    {:notification, :gossip, %{"complete" => ^job.id}} ->
      IO.puts("Other job complete!")
  after
    30_000 ->
      IO.puts("Other job didn't finish in 30 seconds!")
  end
end

Link to this section Summary

Functions

Returns a specification to start this module under a supervisor.

Register the current process to receive relayed messages for the provided channels.

Broadcast a notification to listeners on all nodes.

Link to this section Types

Specs

channel() :: :gossip | :insert | :signal

Specs

option() :: {:name, module()} | {:conf, Oban.Config.t()}

Link to this section Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

listen(server \\ Oban, channels)

View Source

Specs

listen(GenServer.server(), channels :: [channel()]) :: :ok

Register the current process to receive relayed messages for the provided channels.

All messages are received as JSON and decoded before they are relayed to registered processes. Each registered process receives a three element notification tuple in the following format:

{:notification, channel :: channel(), decoded :: map()}

Example

Register to listen for all :gossip channel messages:

Oban.Notifier.listen([:gossip])

Listen for messages on all channels:

Oban.Notifier.listen([:gossip, :insert, :signal])

Listen for messages when using a custom Oban name:

Oban.Notifier.listen(MyApp.Oban, [:gossip, :insert, :signal])
Link to this function

notify(conf, channel, payload)

View Source

Specs

notify(Oban.Config.t(), channel :: channel(), payload :: map()) :: :ok

Broadcast a notification to listeners on all nodes.

Notifications are scoped to the configured prefix. For example, if there are instances running with the public and private prefixes, a notification published in the public prefix won't be picked up by processes listening with the private prefix.

Example

Broadcast a gossip message:

Oban.Notifier.notify(Oban.config(), :gossip, %{message: "hi!"})