# `Musubi.Stream`
[🔗](https://github.com/fahchen/musubi/blob/v0.3.0/lib/musubi/stream.ex#L1)

Stream API for Musubi stores.

Stream-typed slots (declared via `stream/2,3` inside `state do`) carry
collections whose materialization is **owned by the client**. The server
queues raw delta ops (`reset`/`insert`/`delete`) on a per-stream
`Musubi.Stream.Slot` struct stored under `socket.assigns.__streams__`. Each
cycle's queued ops drain into the patch envelope's `stream_ops`, where the
page runtime stamps each op with its owning `store_id`, and the struct is
pruned (BDR-0014, BDR-0018).

The runtime does not keep an ordered `item_keys` list, does not decide
upsert-vs-insert, and does not trim for `:limit` server-side. The client
materializes the stream and applies the per-op `:limit` field.

## Public API surface (frozen for M5+)

  * `stream/3,4`
  * `stream_configure/3`
  * `stream_insert/3,4`
  * `stream_delete/3`
  * `stream_delete_by_item_key/3`

## Reserved socket-assigns shape

`socket.assigns.__streams__` is a map with reserved sub-keys:

  * `__ref__` — monotonic per-page ref counter used to stamp each new
    `Musubi.Stream.Slot`.
  * `__changed__` — `MapSet` of stream names mutated since the last prune.
    Used by `Musubi.Resolver` to know which structs to prune.
  * `__configured__` — pre-init configure opts keyed by stream name.
    Applied when the matching `Musubi.Stream.Slot` is first initialized.
  * `<atom_name>` — the per-stream `Musubi.Stream.Slot` struct.

Sub-keys are runtime-internal; do not read or write directly.

## Examples

    Musubi.Stream.stream_insert(socket, :messages, %{id: "1", body: "hi"})
    #=> %Musubi.Socket{...}

# `config`

```elixir
@type config() :: %{item_key: (term() -&gt; item_key()), limit: integer() | nil}
```

Per-stream config — runtime-merged from compile-time reflection + `stream_configure/3`.

# `item_key`

```elixir
@type item_key() :: String.t()
```

Computed item_key returned by the configured `:item_key` capture. Always a binary.

# `op`

```elixir
@type op() :: %{
  :op =&gt; String.t(),
  :stream =&gt; String.t(),
  :ref =&gt; String.t(),
  :store_id =&gt; [String.t()],
  optional(any()) =&gt; any()
}
```

Wire stream op pushed in the envelope's `stream_ops` array.

# `stream_name`

```elixir
@type stream_name() :: atom()
```

Public stream identifier — atom matching a `stream :name, T, ...` declaration.

# `assigns_key`

```elixir
@spec assigns_key() :: :__streams__
```

Returns the reserved socket-assigns key holding the per-page stream index.

Exposed so tests and hooks can introspect without hard-coding the literal.

# `changed_streams`

```elixir
@spec changed_streams(Musubi.Socket.t()) :: MapSet.t(stream_name())
```

Returns the names of streams marked changed (mutated since the last prune).

Exposed for `Musubi.Resolver` and tests.

## Examples

    Musubi.Stream.changed_streams(socket)
    #=> MapSet.new([:messages])

# `drain_and_prune`

```elixir
@spec drain_and_prune(Musubi.Socket.t()) :: Musubi.Socket.t()
```

Drains pending ops from every `Musubi.Stream.Slot` marked changed, appends
them to the socket-private accumulator, prunes each struct, and clears
the `__changed__` set. Invoked by `Musubi.Resolver`.

Streams not in `__changed__` are left untouched.

# `drained_key`

```elixir
@spec drained_key() :: :__musubi_drained_stream_ops__
```

Returns the reserved socket-private key holding the drained stream ops
(populated by `Musubi.Resolver` and consumed by the page server).

# `flush_pending_ops`

```elixir
@spec flush_pending_ops(Musubi.Socket.t()) :: {[raw_op()], Musubi.Socket.t()}
```

Drains the per-socket accumulator populated by `Musubi.Resolver`
and returns the wire ops for this cycle.

Called by the page runtime once per render cycle, immediately after the
resolver finishes (the prune hook has already run by then). After the
call, the accumulator is empty.

## Examples

    {ops, socket} = Musubi.Stream.flush_pending_ops(socket)
    ops
    #=> [%{op: "insert", stream: "messages", ref: "0", item_key: "messages-1", item: %{...}, at: -1, limit: nil}]

# `pending_ops`

```elixir
@spec pending_ops(Musubi.Socket.t()) :: [raw_op()]
```

Returns the queued stream ops for the current cycle (not yet flushed).

Reads pending fields from each `Musubi.Stream.Slot` plus any already-drained
ops on the socket-private accumulator. Useful for tests and debugging.

## Examples

    socket = Musubi.Stream.stream_insert(%Musubi.Socket{module: M}, :messages, %{id: "1"})
    length(Musubi.Stream.pending_ops(socket))
    #=> 1

# `stream`

```elixir
@spec stream(Musubi.Socket.t(), stream_name(), [term()], keyword()) ::
  Musubi.Socket.t()
```

Bulk seeds or refreshes a stream slot.

With `reset: true`, marks the stream's slot so the flushed wire
ops include a `reset` ahead of the inserts and the client clears its
local stream before applying them.

## Examples

    socket = Musubi.Stream.stream(socket, :messages, [%{id: "1", body: "hi"}])
    socket = Musubi.Stream.stream(socket, :messages, fresh_items, reset: true)

# `stream_configure`

```elixir
@spec stream_configure(Musubi.Socket.t(), stream_name(), keyword()) ::
  Musubi.Socket.t()
```

Sets configure-only options for `name`. Raises if `name` has already been
initialized via `stream/3,4` or `stream_insert/3,4`.

Accepts `:item_key` (arity-1 function returning a binary) and `:limit`
(integer or `nil`). The configuration takes effect when the stream is
next initialized; re-configuring the same stream after init is a
lifetime error.

Configure is purely server-side state — it does not produce a wire op
(the `item_key` capture is not transferable, and per-insert ops carry
the `:limit` the client needs). Documented Musubi divergence vs. the
abstract spec in the realignment notes.

## Examples

    socket = Musubi.Stream.stream_configure(socket, :messages, item_key: &("custom-" <> &1.id))

# `stream_delete`

```elixir
@spec stream_delete(Musubi.Socket.t(), stream_name(), term()) :: Musubi.Socket.t()
```

Queues a delete for one item in a stream slot, deriving its `item_key`
from the item via the stream's configured key function.

## Examples

    socket = Musubi.Stream.stream_delete(socket, :messages, %{id: "1"})

# `stream_delete_by_item_key`

```elixir
@spec stream_delete_by_item_key(Musubi.Socket.t(), stream_name(), item_key()) ::
  Musubi.Socket.t()
```

Queues a delete for one item in a stream slot directly by `item_key`.

## Examples

    socket = Musubi.Stream.stream_delete_by_item_key(socket, :messages, "messages-1")

# `stream_insert`

```elixir
@spec stream_insert(Musubi.Socket.t(), stream_name(), term(), keyword()) ::
  Musubi.Socket.t()
```

Queues an insert for one item in a stream slot.

The default position is `:at` `-1` (append). The runtime does **not**
decide whether the insert is an upsert or new — that is the client's
responsibility. The `:limit` field is passed through verbatim
on the wire op; the client trims if the limit is exceeded after applying
the insert.

## Examples

    socket = Musubi.Stream.stream_insert(socket, :messages, %{id: "1", body: "hi"})
    socket = Musubi.Stream.stream_insert(socket, :messages, item, at: 0, limit: -100)

---

*Consult [api-reference.md](api-reference.md) for complete listing*
