Musubi.Transport.Channel (musubi v0.3.0)

Copy Markdown View Source

Generic Phoenix Channel adapter that mounts any Musubi root store named in the channel module's allowlist.

The public client API connects to a root by {module, id} (see docs/client-contract.md). topic is an internal transport detail: the client builds an opaque topic of the form "musubi:<opaque_ref>" and carries the requested module, id, and optional params in the channel join payload.

Mounting in a Phoenix endpoint

defmodule MyAppWeb.MusubiChannel do
  use Musubi.Transport.Channel,
    stores: [
      MyApp.Stores.CartPageStore,
      MyApp.Stores.InboxStore
    ]
end

defmodule MyAppWeb.UserSocket do
  use Phoenix.Socket

  channel "musubi:*", MyAppWeb.MusubiChannel

  def connect(_params, socket, _connect_info), do: {:ok, socket}
  def id(_socket), do: nil
end

Join payload

Incoming join payload:

%{
  "module" => "MyApp.Stores.CartPageStore",
  "id" => "cart:current-user",
  "params" => %{...optional...}
}

The channel rejects joins whose module is not in the configured allowlist.

Lifecycle

On join/3 the adapter starts a fresh Musubi.Page.Server for the resolved root module + params and links it to the channel pid. On channel terminate/2 the adapter unlinks the page server and stops it with the channel's terminate reason. Reconnect is recovery (BDR-0015): each new join builds a fresh page server with version: 1 and an initial replace "" envelope.

Wire shape

Incoming "command" payload:

%{"store_id" => ["filters"], "name" => "change_query", "payload" => %{...}}

store_id is the in-tree path-shaped locator for the addressed store node. Root is [].

Outgoing "patch" payload — Musubi.Page.PatchEnvelope.to_wire/1:

%{
  "type" => "patch",
  "base_version" => 0,
  "version" => 1,
  "ops" => [...],
  "stream_ops" => [...]
}

Telemetry

  • [:musubi, :channel, :join]%{system_time: integer}. Metadata: module, id, topic, page_pid.
  • [:musubi, :channel, :terminate]%{system_time: integer}. Metadata: module, id, topic, reason, page_pid.