OctaStar.Utility.StreamRegistry (octa_star v0.1.0)

Copy Markdown

Opt-in per-tab stream deduplication.

Tracks active SSE stream processes by a compound key (typically {user_id, tab_id}). When a new stream registers with an existing key, the previous process is killed immediately — no waiting for keepalive timeouts.

Problem

With full-page navigation, SSE stream processes don't learn the client disconnected until they try to write — which only happens on the next PubSub broadcast or keepalive tick. This creates zombie processes that hold subscriptions, do wasted DB queries, and on HTTP/1.1 can exhaust the browser's 6-connection-per-origin limit.

Setup

Add to your application's supervision tree (OctaStar does not start this registry automatically):

# lib/my_app/application.ex
children = [
  OctaStar.Utility.StreamRegistry,
  # ...
]

Then add a tabId signal to your root layout:

<div data-signals={~s({"tabId": "${crypto.randomUUID()}"})}
     data-signals-on-load={~s(sessionStorage.setItem('tabId', $tabId))}
     data-signals-on-load={~s($tabId = sessionStorage.getItem('tabId') || $tabId)}>

sessionStorage is per-tab — each tab gets its own UUID that persists across full-page navigations but is unique per tab.

Important: Do not use a _ prefix for the signal name. Datastar treats _-prefixed signals as local (client-only) and never sends them to the server.

Usage

In your stream controllers, replace OctaStar.start(conn) with OctaStar.start_stream/2 (or call this module directly):

def stream(conn, _params) do
  scope = conn.assigns.current_scope

  conn = OctaStar.start_stream(conn, scope.user.id)

  loop(conn, state)
end

If no tabId signal is present in the request, falls back to OctaStar.start/1 without deduplication — so existing streams keep working while you roll out the client-side signal.

Adapted from dstar (MIT).

Summary

Functions

Replaces any previous process registered under key and registers the current process.

Starts an SSE stream with per-tab deduplication.

Functions

replace_and_register(key)

@spec replace_and_register(term()) :: :ok

Replaces any previous process registered under key and registers the current process.

Kills the previous holder with Process.exit(pid, :replaced) and waits for the registration to clear before registering the caller. This avoids a race where Registry.register/3 fails because the exited process hasn't been cleaned up yet.

start_stream(conn, scope_key)

@spec start_stream(Plug.Conn.t(), term()) :: Plug.Conn.t()

Starts an SSE stream with per-tab deduplication.

Reads tabId from the request signals, kills any previous stream process registered under {scope_key, tab_id}, registers the current process, and calls OctaStar.start/1.

If no tabId signal is present, falls back to OctaStar.start/1 without deduplication.

Parameters

  • conn — the Plug connection
  • scope_key — any term that identifies the user/session (e.g. user.id or {user.id, workspace.id})