phoenix_pubsub v1.0.2 Phoenix.Tracker behaviour

Provides distributed Presence tracking to processes.

Tracker servers use a heartbeat protocol and CRDT to replicate presence information across a cluster in an eventually consistent, conflict-free manner. Under this design, there is no single source of truth or global process. Instead, each node runs one or more Phoenix.Tracker servers and node-local changes are replicated across the cluster and handled locally as a diff of changes.

  • tracker - The name of the tracker handler module implementing the Phoenix.Tracker behaviour
  • tracker_opts - The list of options to pass to the tracker handler
  • server_opts - The list of options to pass to the tracker server

Required server_opts:

  • :name - The name of the server, such as: MyApp.Tracker
  • :pubsub_server - The name of the PubSub server, such as: MyApp.PubSub

Optional server_opts:

  • broadcast_period - The interval in milliseconds to send delta broadcats across the cluster. Default 1500
  • max_silent_periods - The max integer of broadcast periods for which no delta broadcasts have been sent. Defaults 10 (15s heartbeat)
  • down_period - The interval in milliseconds to flag a replica as down temporarily down. Default broadcast_period * max_silent_periods * 2 (30s down detection). Note: This must be at least 2x the broadcast_period.
  • permdown_period - The interval in milliseconds to flag a replica as permanently down, and discard its state. Note: This must be at least greater than the down_period. Default 1_200_000 (20 minutes)
  • clock_sample_periods - The numbers of heartbeat windows to sample remote clocks before collapsing and requesting transfer. Default 2
  • max_delta_sizes - The list of delta generation sizes to keep before falling back to sending entire state. Defaults [100, 1000, 10_000].
  • log_level - The log level to log events, defaults :debug and can be disabled with false

Implementing a Tracker

To start a tracker, first add the tracker to your supervision tree:

worker(MyTracker, [[name: MyTracker, pubsub_server: MyPubSub]])

Next, implement MyTracker with support for the Phoenix.Tracker behaviour callbacks. An example of a minimal tracker could include:

defmodule MyTracker do
  @behaviour Phoenix.Tracker

  def start_link(opts) do
    opts = Keyword.merge([name: __MODULE__], opts)
    GenServer.start_link(Phoenix.Tracker, [__MODULE__, opts, opts], name: __MODULE__)
  end

  def init(opts) do
    server = Keyword.fetch!(opts, :pubsub_server)
    {:ok, %{pubsub_server: server, node_name: Phoenix.PubSub.node_name(server)}}
  end

  def handle_diff(diff, state) do
    for {topic, {joins, leaves}} <- diff do
      for {key, meta} <- joins do
        IO.puts "presence join: key \"#{key}\" with meta #{inspect meta}"
        msg = {:join, key, meta}
        Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg)
      end
      for {key, meta} <- leaves do
        IO.puts "presence leave: key \"#{key}\" with meta #{inspect meta}"
        msg = {:leave, key, meta}
        Phoenix.PubSub.direct_broadcast!(state.node_name, state.pubsub_server, topic, msg)
      end
    end
    {:ok, state}
  end
end

Trackers must implement start_link/1, init/1, and handle_diff/2. The init/1 callback allows the tracker to manage its own state when running within the Phoenix.Tracker server. The handle_diff callback is invoked with a diff of presence join and leave events, grouped by topic. As replicas heartbeat and replicate data, the local tracker state is merged with the remote data, and the diff is sent to the callback. The handler can use this information to notify subscribers of events, as done above.

Special Considerations

Operations within handle_diff/2 happen in the tracker server’s context. Therefore, blocking operations should be avoided when possible, and offloaded to a supervised task when required. Also, a crash in the handle_diff/2 will crash the tracker server, so operations that may crash the server should be offloaded with a Task.Supervisor spawned process.

Summary

Types

presence :: {key :: String.t, meta :: Map.t}

Functions

graceful_permdown(server_name)

Specs

graceful_permdown(atom) :: :ok

Gracefully shuts down by broadcasting permdown to all replicas.

Examples

iex> Phoenix.Tracker.graceful_permdown(MyTracker)
:ok
list(server_name, topic)

Specs

list(atom, topic) :: [presence]

Lists all presences tracked under a given topic.

  • server_name - The registered name of the tracker server
  • topic - The Phoenix.PubSub topic to update for this presence

Returns a lists of presences in key/metadata tuple pairs.

Examples

iex> Phoenix.Tracker.list(MyTracker, "lobby")
[{123, %{name: "user 123"}}, {456, %{name: "user 456"}}]
start_link(tracker, tracker_opts, server_opts)
track(server_name, pid, topic, key, meta)

Specs

track(atom, pid, topic, term, Map.t) ::
  {:ok, ref :: binary} |
  {:error, reason :: term}

Tracks a presence.

  • server_name - The registered name of the tracker server
  • pid - The Pid to track
  • topic - The Phoenix.PubSub topic for this presence
  • key - The key identifying this presence
  • meta - The map of metadata to attach to this presence

A process may be tracked multipled times, provided the topic and key pair are unique for any prior calls for the given process.

Examples

iex> Phoenix.Tracker.track(MyTracker, self(), "lobby", u.id, %{stat: "away"})
{:ok, "1WpAofWYIAA="}

iex> Phoenix.Tracker.track(MyTracker, self(), "lobby", u.id, %{stat: "away"})
{:error, {:already_tracked, #PID<0.56.0>, "lobby", "123"}}
untrack(server_name, pid)
untrack(server_name, pid, topic, key)

Specs

untrack(atom, pid, topic, term) :: :ok

Untracks a presence.

  • server_name - The registered name of the tracker server
  • pid - The Pid to untrack
  • topic - The Phoenix.PubSub topic to untrack for this presence
  • key - The key identifying this presence

All presences for a given Pid can be untracked by calling the Phoenix.Tracker.untrack/2 signature of this function.

Examples

iex> Phoenix.Tracker.untrack(MyTracker, self(), "lobby", u.id)
:ok
iex> Phoenix.Tracker.untrack(MyTracker, self())
:ok
update(server_name, pid, topic, key, meta)

Specs

update(atom, pid, topic, term, Map.t | (Map.t -> Map.t)) ::
  {:ok, ref :: binary} |
  {:error, reason :: term}

Updates a presence’s metadata.

  • server_name - The registered name of the tracker server
  • pid - The Pid being tracked
  • topic - The Phoenix.PubSub topic to update for this presence
  • key - The key identifying this presence

Examples

iex> Phoenix.Tracker.update(MyTracker, self(), "lobby", u.id, %{stat: "zzz"})
{:ok, "1WpAofWYIAA="}

iex> Phoenix.Tracker.update(MyTracker, self(), "lobby", u.id, fn meta -> Map.put(meta, :away, true) end)
{:ok, "1WpAofWYIAA="}
validate_down_period(d_period, b_period)
validate_permdown_period(p_period, d_period)

Callbacks

handle_diff(%{}, state)

Specs

handle_diff(%{optional(topic) => {joins :: [presence], leaves :: [presence]}}, state :: term) :: {:ok, state :: term}
init(arg0)

Specs

init(Keyword.t) ::
  {:ok, state :: term} |
  {:error, reason :: term}