Stream API for Musubi stores.
Stream-typed slots (declared via stream/2,3 inside state do) carry
collections whose materialization is owned by the client. The server
queues raw delta ops (reset/insert/delete) on a per-stream
Musubi.Stream.Slot struct stored under socket.assigns.__streams__. Each
cycle's queued ops drain into the patch envelope's stream_ops, where the
page runtime stamps each op with its owning store_id, and the struct is
pruned (BDR-0014, BDR-0018).
The runtime does not keep an ordered item_keys list, does not decide
upsert-vs-insert, and does not trim for :limit server-side. The client
materializes the stream and applies the per-op :limit field.
Public API surface (frozen for M5+)
stream/3,4stream_configure/3stream_insert/3,4stream_delete/3stream_delete_by_item_key/3
Reserved socket-assigns shape
socket.assigns.__streams__ is a map with reserved sub-keys:
__ref__— monotonic per-page ref counter used to stamp each newMusubi.Stream.Slot.__changed__—MapSetof stream names mutated since the last prune. Used byMusubi.Resolverto know which structs to prune.__configured__— pre-init configure opts keyed by stream name. Applied when the matchingMusubi.Stream.Slotis first initialized.<atom_name>— the per-streamMusubi.Stream.Slotstruct.
Sub-keys are runtime-internal; do not read or write directly.
Examples
Musubi.Stream.stream_insert(socket, :messages, %{id: "1", body: "hi"})
#=> %Musubi.Socket{...}
Summary
Types
Per-stream config — runtime-merged from compile-time reflection + stream_configure/3.
Computed item_key returned by the configured :item_key capture. Always a binary.
Wire stream op pushed in the envelope's stream_ops array.
Public stream identifier — atom matching a stream :name, T, ... declaration.
Functions
Returns the reserved socket-assigns key holding the per-page stream index.
Returns the names of streams marked changed (mutated since the last prune).
Drains pending ops from every Musubi.Stream.Slot marked changed, appends
them to the socket-private accumulator, prunes each struct, and clears
the __changed__ set. Invoked by Musubi.Resolver.
Returns the reserved socket-private key holding the drained stream ops
(populated by Musubi.Resolver and consumed by the page server).
Drains the per-socket accumulator populated by Musubi.Resolver
and returns the wire ops for this cycle.
Returns the queued stream ops for the current cycle (not yet flushed).
Bulk seeds or refreshes a stream slot.
Sets configure-only options for name. Raises if name has already been
initialized via stream/3,4 or stream_insert/3,4.
Queues a delete for one item in a stream slot, deriving its item_key
from the item via the stream's configured key function.
Queues a delete for one item in a stream slot directly by item_key.
Queues an insert for one item in a stream slot.
Types
Per-stream config — runtime-merged from compile-time reflection + stream_configure/3.
@type item_key() :: String.t()
Computed item_key returned by the configured :item_key capture. Always a binary.
@type op() :: %{ :op => String.t(), :stream => String.t(), :ref => String.t(), :store_id => [String.t()], optional(any()) => any() }
Wire stream op pushed in the envelope's stream_ops array.
@type stream_name() :: atom()
Public stream identifier — atom matching a stream :name, T, ... declaration.
Functions
@spec assigns_key() :: :__streams__
Returns the reserved socket-assigns key holding the per-page stream index.
Exposed so tests and hooks can introspect without hard-coding the literal.
@spec changed_streams(Musubi.Socket.t()) :: MapSet.t(stream_name())
Returns the names of streams marked changed (mutated since the last prune).
Exposed for Musubi.Resolver and tests.
Examples
Musubi.Stream.changed_streams(socket)
#=> MapSet.new([:messages])
@spec drain_and_prune(Musubi.Socket.t()) :: Musubi.Socket.t()
Drains pending ops from every Musubi.Stream.Slot marked changed, appends
them to the socket-private accumulator, prunes each struct, and clears
the __changed__ set. Invoked by Musubi.Resolver.
Streams not in __changed__ are left untouched.
@spec drained_key() :: :__musubi_drained_stream_ops__
Returns the reserved socket-private key holding the drained stream ops
(populated by Musubi.Resolver and consumed by the page server).
@spec flush_pending_ops(Musubi.Socket.t()) :: {[raw_op()], Musubi.Socket.t()}
Drains the per-socket accumulator populated by Musubi.Resolver
and returns the wire ops for this cycle.
Called by the page runtime once per render cycle, immediately after the resolver finishes (the prune hook has already run by then). After the call, the accumulator is empty.
Examples
{ops, socket} = Musubi.Stream.flush_pending_ops(socket)
ops
#=> [%{op: "insert", stream: "messages", ref: "0", item_key: "messages-1", item: %{...}, at: -1, limit: nil}]
@spec pending_ops(Musubi.Socket.t()) :: [raw_op()]
Returns the queued stream ops for the current cycle (not yet flushed).
Reads pending fields from each Musubi.Stream.Slot plus any already-drained
ops on the socket-private accumulator. Useful for tests and debugging.
Examples
socket = Musubi.Stream.stream_insert(%Musubi.Socket{module: M}, :messages, %{id: "1"})
length(Musubi.Stream.pending_ops(socket))
#=> 1
@spec stream(Musubi.Socket.t(), stream_name(), [term()], keyword()) :: Musubi.Socket.t()
Bulk seeds or refreshes a stream slot.
With reset: true, marks the stream's slot so the flushed wire
ops include a reset ahead of the inserts and the client clears its
local stream before applying them.
Examples
socket = Musubi.Stream.stream(socket, :messages, [%{id: "1", body: "hi"}])
socket = Musubi.Stream.stream(socket, :messages, fresh_items, reset: true)
@spec stream_configure(Musubi.Socket.t(), stream_name(), keyword()) :: Musubi.Socket.t()
Sets configure-only options for name. Raises if name has already been
initialized via stream/3,4 or stream_insert/3,4.
Accepts :item_key (arity-1 function returning a binary) and :limit
(integer or nil). The configuration takes effect when the stream is
next initialized; re-configuring the same stream after init is a
lifetime error.
Configure is purely server-side state — it does not produce a wire op
(the item_key capture is not transferable, and per-insert ops carry
the :limit the client needs). Documented Musubi divergence vs. the
abstract spec in the realignment notes.
Examples
socket = Musubi.Stream.stream_configure(socket, :messages, item_key: &("custom-" <> &1.id))
@spec stream_delete(Musubi.Socket.t(), stream_name(), term()) :: Musubi.Socket.t()
Queues a delete for one item in a stream slot, deriving its item_key
from the item via the stream's configured key function.
Examples
socket = Musubi.Stream.stream_delete(socket, :messages, %{id: "1"})
@spec stream_delete_by_item_key(Musubi.Socket.t(), stream_name(), item_key()) :: Musubi.Socket.t()
Queues a delete for one item in a stream slot directly by item_key.
Examples
socket = Musubi.Stream.stream_delete_by_item_key(socket, :messages, "messages-1")
@spec stream_insert(Musubi.Socket.t(), stream_name(), term(), keyword()) :: Musubi.Socket.t()
Queues an insert for one item in a stream slot.
The default position is :at -1 (append). The runtime does not
decide whether the insert is an upsert or new — that is the client's
responsibility. The :limit field is passed through verbatim
on the wire op; the client trims if the limit is exceeded after applying
the insert.
Examples
socket = Musubi.Stream.stream_insert(socket, :messages, %{id: "1", body: "hi"})
socket = Musubi.Stream.stream_insert(socket, :messages, item, at: 0, limit: -100)