Phoenix.PubSub

Front-end to Phoenix pubsub layer.

Used internally by Channels for pubsub broadcast but also provides an API for direct usage.

Adapters

Phoenix pubsub was designed to be flexible and support multiple backends. We currently ship with two backends:

Pubsub adapters are often configured in your endpoint:

config :my_app, MyApp.Endpoint,
  pubsub: [adapter: Phoenix.PubSub.PG2]

The configuration above takes care of starting the pubsub backend and exposing its functions via the endpoint module.

Direct usage

It is also possible to use Phoenix.PubSub directly or even run your own pubsub backends outside of an Endpoint.

The first step is to start the adapter of choice in your supervision tree:

supervisor(Phoenix.PubSub.Redis, [:my_redis_pubsub, host: "192.168.100.1"])

The configuration above will start a Redis pubsub and register it with name :my_redis_pubsub.

You can know use the functions in this module to subscribe and broadcast messages:

iex> PubSub.subscribe MyApp.PubSub, self, "user:123"
:ok
iex> Process.info(self)[:messages]
[]
iex> PubSub.broadcast MyApp.PubSub, "user:123", {:user_update, %{id: 123, name: "Shane"}}
:ok
iex> Process.info(self)[:messages]
{:user_update, %{id: 123, name: "Shane"}}

Implementing your own adapter

PubSub adapters run isnide their own supervision tree. If you are interested in providing your own adapter, let’s call it Phoenix.PubSub.MyQueue, the first step is to provide a supervisor module that receives the server name and a bunch of options on start_link/2:

defmodule Phoenix.PubSub.MyQueue do
  def start_link(name, options) do
    Supervisor.start_link(__MODULE__, {name, options},
                          name: Module.concat(name, Supervisor))
  end

  def init({name, options}) do
    ...
  end
end

On init/1, you will define the supervision tree and use the given name to register the main pubsub process locally. This process must be able to handle the following GenServer calls:

Offloading work to clients via MFA response

The Phoenix.PubSub API allows any of its functions to handle a response from the adapter matching {:perform, {m, f, a}}. The PubSub client will recursively invoke all MFA responses until a result is returned. This is useful for offloading work to clients without blocking your PubSub adapter. See Phoenix.PubSub.PG2 implementation for examples.

Source

Summary

broadcast!(server, topic, message)

Broadcasts message on given topic

broadcast(server, topic, message)

Broadcasts message on given topic

broadcast_from!(server, from_pid, topic, message)

Broadcasts message to all but from_pid on given topic

broadcast_from(server, from_pid, topic, message)

Broadcasts message to all but from_pid on given topic

subscribe(server, pid, topic, opts \\ [])

Subscribes the pid to the PubSub adapter’s topic

unsubscribe(server, pid, topic)

Unsubscribes the pid from the PubSub adapter’s topic

Functions

broadcast(server, topic, message)

Specs:

  • broadcast(atom, binary, term) :: :ok | {:error, term}

Broadcasts message on given topic.

Source
broadcast!(server, topic, message)

Specs:

  • broadcast!(atom, binary, term) :: :ok | no_return

Broadcasts message on given topic.

Raises Phoenix.PubSub.BroadcastError if broadcast fails.

Source
broadcast_from(server, from_pid, topic, message)

Specs:

  • broadcast_from(atom, pid, binary, term) :: :ok | no_return
  • broadcast_from(atom, pid, binary, term) :: :ok | {:error, term}

Broadcasts message to all but from_pid on given topic.

Source
broadcast_from!(server, from_pid, topic, message)

Broadcasts message to all but from_pid on given topic.

Raises Phoenix.PubSub.BroadcastError if broadcast fails.

Source
subscribe(server, pid, topic, opts \\ [])

Specs:

  • subscribe(atom, pid, binary, Keyword.t) :: :ok | {:error, term}

Subscribes the pid to the PubSub adapter’s topic.

  • server - The Pid registered name of the server
  • pid - The subscriber pid to receive pubsub messages
  • topic - The topic to subscribe to, ie: "users:123"
  • opts - The optional list of options. Supported options only include :link to link the subscriber to the pubsub adapter
Source
unsubscribe(server, pid, topic)

Specs:

  • unsubscribe(atom, pid, binary) :: :ok | {:error, term}

Unsubscribes the pid from the PubSub adapter’s topic.

Source