OctaStar. Utility. StreamRegistry
(octa_star v0.1.0)
Copy Markdown
Opt-in per-tab stream deduplication.
Tracks active SSE stream processes by a compound key (typically
{user_id, tab_id}). When a new stream registers with an existing
key, the previous process is killed immediately — no waiting for
keepalive timeouts.
Problem
With full-page navigation, SSE stream processes don't learn the client disconnected until they try to write — which only happens on the next PubSub broadcast or keepalive tick. This creates zombie processes that hold subscriptions, do wasted DB queries, and on HTTP/1.1 can exhaust the browser's 6-connection-per-origin limit.
Setup
Add to your application's supervision tree (OctaStar does not start this registry automatically):
# lib/my_app/application.ex
children = [
OctaStar.Utility.StreamRegistry,
# ...
]Then add a tabId signal to your root layout:
<div data-signals={~s({"tabId": "${crypto.randomUUID()}"})}
data-signals-on-load={~s(sessionStorage.setItem('tabId', $tabId))}
data-signals-on-load={~s($tabId = sessionStorage.getItem('tabId') || $tabId)}>sessionStorage is per-tab — each tab gets its own UUID that
persists across full-page navigations but is unique per tab.
Important: Do not use a
_prefix for the signal name. Datastar treats_-prefixed signals as local (client-only) and never sends them to the server.
Usage
In your stream controllers, replace OctaStar.start(conn) with
OctaStar.start_stream/2 (or call this module directly):
def stream(conn, _params) do
scope = conn.assigns.current_scope
conn = OctaStar.start_stream(conn, scope.user.id)
loop(conn, state)
endIf no tabId signal is present in the request, falls back to
OctaStar.start/1 without deduplication — so existing streams
keep working while you roll out the client-side signal.
Adapted from dstar (MIT).
Summary
Functions
Replaces any previous process registered under key and registers
the current process.
Starts an SSE stream with per-tab deduplication.
Functions
@spec replace_and_register(term()) :: :ok
Replaces any previous process registered under key and registers
the current process.
Kills the previous holder with Process.exit(pid, :replaced) and
waits for the registration to clear before registering the caller.
This avoids a race where Registry.register/3 fails because the
exited process hasn't been cleaned up yet.
@spec start_stream(Plug.Conn.t(), term()) :: Plug.Conn.t()
Starts an SSE stream with per-tab deduplication.
Reads tabId from the request signals, kills any previous stream
process registered under {scope_key, tab_id}, registers the
current process, and calls OctaStar.start/1.
If no tabId signal is present, falls back to OctaStar.start/1
without deduplication.
Parameters
conn— the Plug connectionscope_key— any term that identifies the user/session (e.g.user.idor{user.id, workspace.id})