Getting started
First you need to implement a topic definition. For example, a todo list topic might track todo items and their order, handle requests from clients to add new items or update the text of existing items, and handle Erlang messages from a separate process indicating when items are done:
defmodule MyApp.Topics.List do
use Topical.Topic, route: ["lists", :list_id]
# Initialise the topic
def init(params) do
list_id = Map.fetch!(params, :list_id)
value = %{items: %{}, order: []} # exposed 'value' of the topic
state = %{list_id: list_id, last_item_id: 0} # hidden server state
topic = Topic.new(value, state)
{:ok, topic}
end
# Optionally, handle execution of an action
def handle_execute("add_item", {text}, topic, _context) do
{id, topic} = generate_item_id(topic)
# Update the topic by putting the item in 'items', and appending the id to 'order'
topic =
topic
|> Topic.set([:items, id], %{text: text, done: false})
|> Topic.insert([:order], id)
# Return the result (the 'id'), and the updated topic
{:ok, id, topic}
end
defp generate_item_id(topic) do
id = topic.state.last_item_id + 1
topic = put_in(topic[:state][:last_item_id], id)
{Integer.to_string(id), topic}
end
# Optionally, handle a notification (an action without a result)
def handle_notify("update_text", {id, text}, topic) do
topic = Topic.set(topic, [:items, id, :text], text)
{:ok, topic}
end
# Optionally, handle Erlang messages
def handle_info({:done, id}, topic) do
topic = Topic.set(topic, [:items, id, :done], true)
{:ok, topic}
end
end
authorization
Authorization
You can control access to topics by implementing the connect/2 callback. This is called
before a topic is accessed (via subscribe, execute, notify, or capture). The callback receives
the params (a map containing route params and any declared params) and the context
(established during WebSocket connection):
defmodule MyApp.Topics.PrivateList do
use Topical.Topic, route: ["lists", :list_id]
def connect(params, context) do
list_id = Map.fetch!(params, :list_id)
if can_access_list?(context.user_id, list_id) do
{:ok, params}
else
{:error, :forbidden}
end
end
# ... other callbacks
endReturn {:ok, params} to allow access, or {:error, reason} to deny. The default
implementation allows all access.
The connect/2 callback can also modify params, which is useful for incorporating
context values (like user_id from authentication) into topic params:
def connect(params, context) do
{:ok, Map.put(params, :user_id, context.user_id)}
endModified params affect topic identity - topics with different param values are separate instances. This enables per-user or per-tenant topics.
parameters
Parameters
Topics can declare optional parameters with default values. Different parameter values create separate topic instances:
defmodule MyApp.Topics.Leaderboard do
use Topical.Topic, route: ["leaderboards", :game_id], params: [region: "global"]
def init(params) do
game_id = Map.fetch!(params, :game_id)
region = Map.fetch!(params, :region)
{:ok, Topic.new(%{game_id: game_id, region: region, entries: []})}
end
endClients subscribe with params to access specific instances:
# Uses default region "global"
Topical.subscribe(MyApp.Topical, ["leaderboards", "chess"], self())
# Subscribes to EU region (separate instance)
Topical.subscribe(MyApp.Topical, ["leaderboards", "chess"], self(), nil, %{"region" => "eu"})Parameters are also available in connect/2 for access control based on param values.
supervision
Supervision
Then add a Topical registry to your application supervision tree, referencing the topic:
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
# ...
{Topical, name: MyApp.Topical, topics: [MyApp.Topics.List, ...]},
]
Supervisor.start_link(children, ...)
end
endAt this point you should be able to subscribe to the topic:
{:ok, ref} = Topical.subscribe(MyApp.Topical, "lists/foo", self())After subscribing, the process will be sent an initial {:reset, ref, value} message (where ref
is the subscription reference returned from subscribe), and then subsequent
{:updates, ref, updates} messages, where updates is a list of updates, each taking the form:
{:set, path, value}: thevaluehas been set at thepath.{:unset, path, key}: thekeyhas been unset from the object at thepath.{:insert, path, index, values}: thevalueshave been inserted into the array at thepath.{:delete, path, index, count}:countvalues have been deleted from the array at thepath, from the positionindex.{:merge, path, value}: thevaluehas been (shallow) merged at thepath.
(You can receive any waiting messages on an IEx shell with:
receive do x -> x after 0 -> nil end.)
To execute an action:
{:ok, item_id} = Topical.execute(MyApp.Topical, "lists/foo", "add_item", {"Test item", false})To unsubscribe:
Topical.unsubscribe(MyApp.Topical, "lists/foo", ref)However, rather than using the API from Elixir, you may wish to set up a WebSocket adapter, and then use the JavaScript client.