# `ALLM.StreamCollector`
[🔗](https://github.com/cykod/ALLM/blob/v0.3.0/lib/allm/stream_collector.ex#L1)

Reduce a stream of `ALLM.Event` values into a collected `%ALLM.Response{}`,
`%ALLM.StepResult{}`, or `%ALLM.ChatResult{}`. See spec §13.1.

Layer C — stateless fold state. `StreamCollector` is the shared reducer
that every non-streaming wrapper (`ALLM.generate/3` in Phase 5, future
`step/3` and `chat/3`) builds on. Callers fold a stream with the stdlib
idiom:

    Enum.reduce(stream, ALLM.StreamCollector.new(), fn e, s ->
      ALLM.StreamCollector.apply_event(s, e)
    end)

then extract the result with `to_response/1` (thread-less) or
`to_step_result/1` / `to_chat_result/1` (thread-backed).

## Phase 5 extensions to spec §13.1

  * `new/0` builds a thread-less collector used by `ALLM.stream_generate/3`.
  * `new/1` additionally accepts `nil` (thread-less) alongside a
    `%ALLM.Thread{}`.
  * `to_response/1` returns the accumulated `%Response{}` and is the
    canonical output for thread-less collection.

Both fit alongside the spec §13.1 signatures (`new/1`, `apply_event/2`,
`to_step_result/1`, `to_chat_result/1`) without replacing them.

Phase 6 adds two new fields, `:tool_results` and `:halt`, populated by
fold clauses for orchestration tags. Phase 7 adds `:chat_result` and
populates `:steps` via the `:step_completed` fold clause.

## Phase 7 extension

The `:step_completed` fold appends a `%StepResult{}` (built from the
PRE-RESET collector state — see Phase 7 Non-obvious Decision #6) to
`:steps` and resets the per-step sub-state (`:current_text`,
`:current_tool_calls`, `:tool_call_order`, `:tool_results`, `:halt`,
`:finish_reason`, `:raw_finish_reason`, `:last_message`). `:error`
and `:metadata` are NOT reset — adapter mid-stream errors and
accumulated metadata persist across step boundaries so that
`to_chat_result/1`'s fallback path can preserve them.

The `:chat_completed` fold stores the event's `:result` verbatim in
`:chat_result` and sets `:done? = true`. `to_chat_result/1` short-
circuits to the stored result when present; otherwise it falls back
to a computed `%ChatResult{}` whose `halted_reason` is `:cancelled`
(consumer halted early) or `:error` (mid-stream adapter error).

## Fold semantics — Phase 5 subset plus Phase 6 + Phase 7 extensions

`apply_event/2` ships explicit clauses for the nine Phase-5 adapter-emitted
tags (`:message_started`, `:text_delta`, `:text_completed`,
`:tool_call_started`, `:tool_call_delta`, `:tool_call_completed`,
`:message_completed`, `:raw_chunk`, `:error`), three Phase-6
orchestration tags (`:tool_result_encoded`, `:tool_halt`,
`:ask_user_requested`), and two Phase-7 orchestration tags
(`:step_completed`, `:chat_completed`). Every other tag —
`:tool_execution_started`, `:tool_execution_completed`, plus any
malformed event — falls through a single catch-all
`apply_event(state, _), do: state`.

| Tag | Fold | Rationale |
|-----|------|-----------|
| `:tool_result_encoded` | append `%Message{role: :tool, tool_call_id: id, content: content}` to `state.tool_results`. | `to_step_result/1` reads `:tool_results` from the struct, enabling `step ≡ stream_step \|> collect_step`. |
| `:tool_halt` | when `state.halt == nil`, set `state.halt = {:halt, reason, id, result}` AND append the encoded sentinel `%Message{role: :tool, ...}` (from payload `:content`) to `state.tool_results`. Subsequent halts are no-ops (first-halt-wins). | Halt metadata lives separately from `:finish_reason` so `done?/1` can combine both signals. The sentinel append (Phase 7.6 cleanup B1) keeps `tool_results` aligned with the non-streaming `Chat.do_step/4` path. |
| `:ask_user_requested` | when `state.halt == nil`, set `state.halt = {:ask_user, :ask_user, id, q, o}` AND append a `<awaiting user response>` sentinel message to `state.tool_results`. First-halt-wins. | Same channel as `:tool_halt`; the sentinel append (Phase 7.6 cleanup B1) mirrors spec §12.3 step 1. |
| `:step_completed` | append `%StepResult{}` (PRE-RESET state) to `state.steps`; set `state.thread = thread`; reset per-step sub-state. | Multi-step `Chat.stream/3` reductions need a clean per-step boundary. `:error` / `:metadata` deliberately persist. |
| `:chat_completed` | set `state.chat_result = result`; set `state.done? = true`. | The chat-layer terminal event; `to_chat_result/1` short-circuits to the stored value when present. |

## Totality guarantee

`apply_event/2` never raises on a well-shaped event in the 16-tag closed
union. A `:raw_chunk` with payload `{:usage, map}` allows a `KeyError`
from `struct!(ALLM.Usage, map)` to propagate — that's an adapter bug
(emitting keys not in `%Usage{}`), not a collector bug.

## Adapter-side contract for `:raw_chunk {:usage, _}` (Usage fold)

The `{:raw_chunk, {:usage, map}}` fold applies `struct!(ALLM.Usage, map)`,
which raises `KeyError` on any map key outside `%ALLM.Usage{}`'s field set.
Adapters emitting `{:raw_chunk, {:usage, _}}` events MUST pre-map their
provider's wire-level usage fields (e.g., OpenAI's `prompt_tokens` /
`completion_tokens`; Anthropic's `input_tokens` / `output_tokens` /
`cache_read_input_tokens`) to `%ALLM.Usage{}` field names before emitting.
An adapter that forwards the raw provider payload will `KeyError` the
reducer mid-stream, destroying the accumulated collector state. See
`lib/allm/usage.ex` for the canonical `%Usage{}` field set. This is a
load-bearing invariant for any real provider adapter implementing
`ALLM.StreamAdapter`.

## Mid-stream errors

A terminal `{:error, struct}` event folds into the collector's `:error`
field and sets `:finish_reason` to `:error`. `to_response/1` still
returns `%Response{}` — a mid-stream error is a *success* at the fold
layer: the caller reads `response.finish_reason == :error` and
`response.metadata.error` to detect it. `to_response/1` never returns
`{:error, _}`.

# `halt_state`

```elixir
@type halt_state() ::
  nil
  | {:halt, reason :: atom(), tool_call_id :: String.t(), result :: term()}
  | {:ask_user, :ask_user, tool_call_id :: String.t(), question :: String.t(),
     opts :: keyword()}
```

Terminal halt signal derived from `:tool_halt` / `:ask_user_requested`
events. `nil` until the first halt event is observed; set once and never
updated (first-halt-wins — see Phase 6 design Invariant 7).

The `:halt` shape carries the handler's raw `result` term as the fourth
element so `merge_halt_metadata/2` can project it onto `:halt_result`,
matching the non-streaming `ToolRunner.run_tool_calls/3` halt_metadata
shape (Phase 7.6 cleanup — chat-equivalence blocker B2).

# `state`

```elixir
@type state() :: %ALLM.StreamCollector{
  chat_result: ALLM.ChatResult.t() | nil,
  current_text: String.t(),
  current_tool_calls: %{required(String.t()) =&gt; ALLM.ToolCall.t()},
  done?: boolean(),
  error:
    ALLM.Error.AdapterError.t() | ALLM.Error.StreamError.t() | term() | nil,
  finish_reason: ALLM.Response.finish_reason() | nil,
  halt: halt_state(),
  last_message: ALLM.Message.t() | nil,
  last_response: ALLM.Response.t() | nil,
  metadata: map(),
  raw_finish_reason: String.t() | nil,
  steps: [ALLM.StepResult.t()],
  thread: ALLM.Thread.t() | nil,
  tool_call_order: [String.t()],
  tool_results: [ALLM.Message.t()],
  usage: ALLM.Usage.t()
}
```

# `apply_event`

```elixir
@spec apply_event(state(), ALLM.Event.t()) :: state()
```

Fold a single `ALLM.Event` into the collector state. Total over the 16-tag
closed union; unknown tags or malformed payloads are no-ops (state
unchanged).

See the module doc's "Fold semantics — Phase 5 subset" section for the
per-tag state transitions.

## Examples

    iex> s = ALLM.StreamCollector.new()
    iex> s = ALLM.StreamCollector.apply_event(s, {:text_delta, %{id: nil, delta: "hel"}})
    iex> s = ALLM.StreamCollector.apply_event(s, {:text_delta, %{id: nil, delta: "lo"}})
    iex> s.current_text
    "hello"

# `new`

```elixir
@spec new() :: state()
```

Build a thread-less collector. Equivalent to `new(nil)`.

Use this when reducing `ALLM.stream_generate/3`'s stream into a
`%Response{}` via `to_response/1`. Calling `to_step_result/1` or
`to_chat_result/1` on a thread-less collector raises `ArgumentError`.

## Examples

    iex> s = ALLM.StreamCollector.new()
    iex> s.thread
    nil
    iex> s.current_text
    ""

# `new`

```elixir
@spec new(ALLM.Thread.t() | nil) :: state()
```

Build a collector either thread-less (`nil`) or seeded with a
`%ALLM.Thread{}`. The thread is required by `to_step_result/1` and
`to_chat_result/1`; use `new/0` or `new(nil)` when only `to_response/1`
will be consumed.

## Examples

    iex> thread = ALLM.Thread.new()
    iex> s = ALLM.StreamCollector.new(thread)
    iex> s.thread
    %ALLM.Thread{messages: [], metadata: %{}}

    iex> ALLM.StreamCollector.new(nil).thread
    nil

# `to_chat_result`

```elixir
@spec to_chat_result(state()) :: ALLM.ChatResult.t()
```

Build a `%ALLM.ChatResult{}` from the collector state.

Two branches (Phase 7 Non-obvious Decision #7):

  * **Stored.** When `state.chat_result` is set (from a `:chat_completed`
    fold), return it verbatim — even when `state.thread` is `nil`. The
    stored result is authoritative; the orchestrator already constructed
    the canonical ChatResult.
  * **Fallback (computed).** When `state.chat_result` is `nil` and
    `state.thread` is set, build a `%ChatResult{}` from collector state.
    `:halted_reason` is `:error` when `state.error != nil` (mid-stream
    adapter error); otherwise ALWAYS `:cancelled` (the consumer halted
    the stream early; non-empty `state.steps` does NOT promote to
    `:completed`). `:final_response` is the last step's response when
    `state.steps != []`, or `to_response(state)` when no steps were
    observed.

Raises `ArgumentError` when both `state.chat_result` and `state.thread`
are `nil`.

# `to_response`

```elixir
@spec to_response(state()) :: ALLM.Response.t()
```

Build a `%ALLM.Response{}` from the collector state.

Works on any collector (thread-less or not). A mid-stream error surfaces
as `finish_reason: :error` with the error struct under `metadata.error`;
this function never returns `{:error, _}`.

## Examples

    iex> s = ALLM.StreamCollector.new()
    iex> s = ALLM.StreamCollector.apply_event(s, {:text_delta, %{id: nil, delta: "hi"}})
    iex> s = ALLM.StreamCollector.apply_event(s, {:message_completed, %{message: %ALLM.Message{role: :assistant, content: "hi"}, finish_reason: :stop}})
    iex> resp = ALLM.StreamCollector.to_response(s)
    iex> {resp.output_text, resp.finish_reason}
    {"hi", :stop}

# `to_step_result`

```elixir
@spec to_step_result(state()) :: ALLM.StepResult.t()
```

Build a `%ALLM.StepResult{}` from the collector state. Requires a non-nil
thread (from `new/1` with a `%Thread{}`); raises `ArgumentError` otherwise.

`:done?` is `true` when `state.halt != nil` (a halt event was folded in)
or when `finish_reason in [:stop, :length, :content_filter, :error]`;
`false` otherwise (for `:tool_calls` or `nil` with no halt).

`:tool_results` is populated from the `:tool_result_encoded` fold clause
AND from the `:tool_halt` / `:ask_user_requested` sentinel appends
(Phase 7.6 cleanup B1).

`:metadata` merges halt metadata when `state.halt != nil` (see Phase 6
design §StreamCollector extension):
  * `{:halt, reason, id, result}` → `%{halted_reason: reason,
    halt_tool_call_id: id, halt_result: result}`.
  * `{:ask_user, :ask_user, id, q, o}` → `%{halted_reason: :ask_user,
    pending_tool_call_id: id, pending_question: q, ask_user_opts: o}`.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
