Musubi.Stream (musubi v0.3.0)

Copy Markdown View Source

Stream API for Musubi stores.

Stream-typed slots (declared via stream/2,3 inside state do) carry collections whose materialization is owned by the client. The server queues raw delta ops (reset/insert/delete) on a per-stream Musubi.Stream.Slot struct stored under socket.assigns.__streams__. Each cycle's queued ops drain into the patch envelope's stream_ops, where the page runtime stamps each op with its owning store_id, and the struct is pruned (BDR-0014, BDR-0018).

The runtime does not keep an ordered item_keys list, does not decide upsert-vs-insert, and does not trim for :limit server-side. The client materializes the stream and applies the per-op :limit field.

Public API surface (frozen for M5+)

Reserved socket-assigns shape

socket.assigns.__streams__ is a map with reserved sub-keys:

  • __ref__ — monotonic per-page ref counter used to stamp each new Musubi.Stream.Slot.
  • __changed__MapSet of stream names mutated since the last prune. Used by Musubi.Resolver to know which structs to prune.
  • __configured__ — pre-init configure opts keyed by stream name. Applied when the matching Musubi.Stream.Slot is first initialized.
  • <atom_name> — the per-stream Musubi.Stream.Slot struct.

Sub-keys are runtime-internal; do not read or write directly.

Examples

Musubi.Stream.stream_insert(socket, :messages, %{id: "1", body: "hi"})
#=> %Musubi.Socket{...}

Summary

Types

Per-stream config — runtime-merged from compile-time reflection + stream_configure/3.

Computed item_key returned by the configured :item_key capture. Always a binary.

Wire stream op pushed in the envelope's stream_ops array.

Public stream identifier — atom matching a stream :name, T, ... declaration.

Functions

Returns the reserved socket-assigns key holding the per-page stream index.

Returns the names of streams marked changed (mutated since the last prune).

Drains pending ops from every Musubi.Stream.Slot marked changed, appends them to the socket-private accumulator, prunes each struct, and clears the __changed__ set. Invoked by Musubi.Resolver.

Returns the reserved socket-private key holding the drained stream ops (populated by Musubi.Resolver and consumed by the page server).

Drains the per-socket accumulator populated by Musubi.Resolver and returns the wire ops for this cycle.

Returns the queued stream ops for the current cycle (not yet flushed).

Bulk seeds or refreshes a stream slot.

Sets configure-only options for name. Raises if name has already been initialized via stream/3,4 or stream_insert/3,4.

Queues a delete for one item in a stream slot, deriving its item_key from the item via the stream's configured key function.

Queues a delete for one item in a stream slot directly by item_key.

Queues an insert for one item in a stream slot.

Types

config()

@type config() :: %{item_key: (term() -> item_key()), limit: integer() | nil}

Per-stream config — runtime-merged from compile-time reflection + stream_configure/3.

item_key()

@type item_key() :: String.t()

Computed item_key returned by the configured :item_key capture. Always a binary.

op()

@type op() :: %{
  :op => String.t(),
  :stream => String.t(),
  :ref => String.t(),
  :store_id => [String.t()],
  optional(any()) => any()
}

Wire stream op pushed in the envelope's stream_ops array.

stream_name()

@type stream_name() :: atom()

Public stream identifier — atom matching a stream :name, T, ... declaration.

Functions

assigns_key()

@spec assigns_key() :: :__streams__

Returns the reserved socket-assigns key holding the per-page stream index.

Exposed so tests and hooks can introspect without hard-coding the literal.

changed_streams(socket)

@spec changed_streams(Musubi.Socket.t()) :: MapSet.t(stream_name())

Returns the names of streams marked changed (mutated since the last prune).

Exposed for Musubi.Resolver and tests.

Examples

Musubi.Stream.changed_streams(socket)
#=> MapSet.new([:messages])

drain_and_prune(socket)

@spec drain_and_prune(Musubi.Socket.t()) :: Musubi.Socket.t()

Drains pending ops from every Musubi.Stream.Slot marked changed, appends them to the socket-private accumulator, prunes each struct, and clears the __changed__ set. Invoked by Musubi.Resolver.

Streams not in __changed__ are left untouched.

drained_key()

@spec drained_key() :: :__musubi_drained_stream_ops__

Returns the reserved socket-private key holding the drained stream ops (populated by Musubi.Resolver and consumed by the page server).

flush_pending_ops(socket)

@spec flush_pending_ops(Musubi.Socket.t()) :: {[raw_op()], Musubi.Socket.t()}

Drains the per-socket accumulator populated by Musubi.Resolver and returns the wire ops for this cycle.

Called by the page runtime once per render cycle, immediately after the resolver finishes (the prune hook has already run by then). After the call, the accumulator is empty.

Examples

{ops, socket} = Musubi.Stream.flush_pending_ops(socket)
ops
#=> [%{op: "insert", stream: "messages", ref: "0", item_key: "messages-1", item: %{...}, at: -1, limit: nil}]

pending_ops(socket)

@spec pending_ops(Musubi.Socket.t()) :: [raw_op()]

Returns the queued stream ops for the current cycle (not yet flushed).

Reads pending fields from each Musubi.Stream.Slot plus any already-drained ops on the socket-private accumulator. Useful for tests and debugging.

Examples

socket = Musubi.Stream.stream_insert(%Musubi.Socket{module: M}, :messages, %{id: "1"})
length(Musubi.Stream.pending_ops(socket))
#=> 1

stream(socket, name, items, opts \\ [])

@spec stream(Musubi.Socket.t(), stream_name(), [term()], keyword()) ::
  Musubi.Socket.t()

Bulk seeds or refreshes a stream slot.

With reset: true, marks the stream's slot so the flushed wire ops include a reset ahead of the inserts and the client clears its local stream before applying them.

Examples

socket = Musubi.Stream.stream(socket, :messages, [%{id: "1", body: "hi"}])
socket = Musubi.Stream.stream(socket, :messages, fresh_items, reset: true)

stream_configure(socket, name, opts)

@spec stream_configure(Musubi.Socket.t(), stream_name(), keyword()) ::
  Musubi.Socket.t()

Sets configure-only options for name. Raises if name has already been initialized via stream/3,4 or stream_insert/3,4.

Accepts :item_key (arity-1 function returning a binary) and :limit (integer or nil). The configuration takes effect when the stream is next initialized; re-configuring the same stream after init is a lifetime error.

Configure is purely server-side state — it does not produce a wire op (the item_key capture is not transferable, and per-insert ops carry the :limit the client needs). Documented Musubi divergence vs. the abstract spec in the realignment notes.

Examples

socket = Musubi.Stream.stream_configure(socket, :messages, item_key: &("custom-" <> &1.id))

stream_delete(socket, name, item)

@spec stream_delete(Musubi.Socket.t(), stream_name(), term()) :: Musubi.Socket.t()

Queues a delete for one item in a stream slot, deriving its item_key from the item via the stream's configured key function.

Examples

socket = Musubi.Stream.stream_delete(socket, :messages, %{id: "1"})

stream_delete_by_item_key(socket, name, item_key)

@spec stream_delete_by_item_key(Musubi.Socket.t(), stream_name(), item_key()) ::
  Musubi.Socket.t()

Queues a delete for one item in a stream slot directly by item_key.

Examples

socket = Musubi.Stream.stream_delete_by_item_key(socket, :messages, "messages-1")

stream_insert(socket, name, item, opts \\ [])

@spec stream_insert(Musubi.Socket.t(), stream_name(), term(), keyword()) ::
  Musubi.Socket.t()

Queues an insert for one item in a stream slot.

The default position is :at -1 (append). The runtime does not decide whether the insert is an upsert or new — that is the client's responsibility. The :limit field is passed through verbatim on the wire op; the client trims if the limit is exceeded after applying the insert.

Examples

socket = Musubi.Stream.stream_insert(socket, :messages, %{id: "1", body: "hi"})
socket = Musubi.Stream.stream_insert(socket, :messages, item, at: 0, limit: -100)