Reduce a stream of ALLM.Event values into a collected %ALLM.Response{},
%ALLM.StepResult{}, or %ALLM.ChatResult{}. See spec §13.1.
Layer C — stateless fold state. StreamCollector is the shared reducer
that every non-streaming wrapper (ALLM.generate/3 in Phase 5, future
step/3 and chat/3) builds on. Callers fold a stream with the stdlib
idiom:
Enum.reduce(stream, ALLM.StreamCollector.new(), fn e, s ->
ALLM.StreamCollector.apply_event(s, e)
end)then extract the result with to_response/1 (thread-less) or
to_step_result/1 / to_chat_result/1 (thread-backed).
Phase 5 extensions to spec §13.1
new/0builds a thread-less collector used byALLM.stream_generate/3.new/1additionally acceptsnil(thread-less) alongside a%ALLM.Thread{}.to_response/1returns the accumulated%Response{}and is the canonical output for thread-less collection.
Both fit alongside the spec §13.1 signatures (new/1, apply_event/2,
to_step_result/1, to_chat_result/1) without replacing them.
Phase 6 adds two new fields, :tool_results and :halt, populated by
fold clauses for orchestration tags. Phase 7 adds :chat_result and
populates :steps via the :step_completed fold clause.
Phase 7 extension
The :step_completed fold appends a %StepResult{} (built from the
PRE-RESET collector state — see Phase 7 Non-obvious Decision #6) to
:steps and resets the per-step sub-state (:current_text,
:current_tool_calls, :tool_call_order, :tool_results, :halt,
:finish_reason, :raw_finish_reason, :last_message). :error
and :metadata are NOT reset — adapter mid-stream errors and
accumulated metadata persist across step boundaries so that
to_chat_result/1's fallback path can preserve them.
The :chat_completed fold stores the event's :result verbatim in
:chat_result and sets :done? = true. to_chat_result/1 short-
circuits to the stored result when present; otherwise it falls back
to a computed %ChatResult{} whose halted_reason is :cancelled
(consumer halted early) or :error (mid-stream adapter error).
Fold semantics — Phase 5 subset plus Phase 6 + Phase 7 extensions
apply_event/2 ships explicit clauses for the nine Phase-5 adapter-emitted
tags (:message_started, :text_delta, :text_completed,
:tool_call_started, :tool_call_delta, :tool_call_completed,
:message_completed, :raw_chunk, :error), three Phase-6
orchestration tags (:tool_result_encoded, :tool_halt,
:ask_user_requested), and two Phase-7 orchestration tags
(:step_completed, :chat_completed). Every other tag —
:tool_execution_started, :tool_execution_completed, plus any
malformed event — falls through a single catch-all
apply_event(state, _), do: state.
| Tag | Fold | Rationale |
|---|---|---|
:tool_result_encoded | append %Message{role: :tool, tool_call_id: id, content: content} to state.tool_results. | to_step_result/1 reads :tool_results from the struct, enabling step ≡ stream_step |> collect_step. |
:tool_halt | when state.halt == nil, set state.halt = {:halt, reason, id, result} AND append the encoded sentinel %Message{role: :tool, ...} (from payload :content) to state.tool_results. Subsequent halts are no-ops (first-halt-wins). | Halt metadata lives separately from :finish_reason so done?/1 can combine both signals. The sentinel append (Phase 7.6 cleanup B1) keeps tool_results aligned with the non-streaming Chat.do_step/4 path. |
:ask_user_requested | when state.halt == nil, set state.halt = {:ask_user, :ask_user, id, q, o} AND append a <awaiting user response> sentinel message to state.tool_results. First-halt-wins. | Same channel as :tool_halt; the sentinel append (Phase 7.6 cleanup B1) mirrors spec §12.3 step 1. |
:step_completed | append %StepResult{} (PRE-RESET state) to state.steps; set state.thread = thread; reset per-step sub-state. | Multi-step Chat.stream/3 reductions need a clean per-step boundary. :error / :metadata deliberately persist. |
:chat_completed | set state.chat_result = result; set state.done? = true. | The chat-layer terminal event; to_chat_result/1 short-circuits to the stored value when present. |
Totality guarantee
apply_event/2 never raises on a well-shaped event in the 16-tag closed
union. A :raw_chunk with payload {:usage, map} allows a KeyError
from struct!(ALLM.Usage, map) to propagate — that's an adapter bug
(emitting keys not in %Usage{}), not a collector bug.
Adapter-side contract for :raw_chunk {:usage, _} (Usage fold)
The {:raw_chunk, {:usage, map}} fold applies struct!(ALLM.Usage, map),
which raises KeyError on any map key outside %ALLM.Usage{}'s field set.
Adapters emitting {:raw_chunk, {:usage, _}} events MUST pre-map their
provider's wire-level usage fields (e.g., OpenAI's prompt_tokens /
completion_tokens; Anthropic's input_tokens / output_tokens /
cache_read_input_tokens) to %ALLM.Usage{} field names before emitting.
An adapter that forwards the raw provider payload will KeyError the
reducer mid-stream, destroying the accumulated collector state. See
lib/allm/usage.ex for the canonical %Usage{} field set. This is a
load-bearing invariant for any real provider adapter implementing
ALLM.StreamAdapter.
Mid-stream errors
A terminal {:error, struct} event folds into the collector's :error
field and sets :finish_reason to :error. to_response/1 still
returns %Response{} — a mid-stream error is a success at the fold
layer: the caller reads response.finish_reason == :error and
response.metadata.error to detect it. to_response/1 never returns
{:error, _}.
Summary
Types
Terminal halt signal derived from :tool_halt / :ask_user_requested
events. nil until the first halt event is observed; set once and never
updated (first-halt-wins — see Phase 6 design Invariant 7).
Functions
Fold a single ALLM.Event into the collector state. Total over the 16-tag
closed union; unknown tags or malformed payloads are no-ops (state
unchanged).
Build a thread-less collector. Equivalent to new(nil).
Build a collector either thread-less (nil) or seeded with a
%ALLM.Thread{}. The thread is required by to_step_result/1 and
to_chat_result/1; use new/0 or new(nil) when only to_response/1
will be consumed.
Build a %ALLM.ChatResult{} from the collector state.
Build a %ALLM.Response{} from the collector state.
Build a %ALLM.StepResult{} from the collector state. Requires a non-nil
thread (from new/1 with a %Thread{}); raises ArgumentError otherwise.
Types
@type halt_state() :: nil | {:halt, reason :: atom(), tool_call_id :: String.t(), result :: term()} | {:ask_user, :ask_user, tool_call_id :: String.t(), question :: String.t(), opts :: keyword()}
Terminal halt signal derived from :tool_halt / :ask_user_requested
events. nil until the first halt event is observed; set once and never
updated (first-halt-wins — see Phase 6 design Invariant 7).
The :halt shape carries the handler's raw result term as the fourth
element so merge_halt_metadata/2 can project it onto :halt_result,
matching the non-streaming ToolRunner.run_tool_calls/3 halt_metadata
shape (Phase 7.6 cleanup — chat-equivalence blocker B2).
@type state() :: %ALLM.StreamCollector{ chat_result: ALLM.ChatResult.t() | nil, current_text: String.t(), current_tool_calls: %{required(String.t()) => ALLM.ToolCall.t()}, done?: boolean(), error: ALLM.Error.AdapterError.t() | ALLM.Error.StreamError.t() | term() | nil, finish_reason: ALLM.Response.finish_reason() | nil, halt: halt_state(), last_message: ALLM.Message.t() | nil, last_response: ALLM.Response.t() | nil, metadata: map(), raw_finish_reason: String.t() | nil, steps: [ALLM.StepResult.t()], thread: ALLM.Thread.t() | nil, tool_call_order: [String.t()], tool_results: [ALLM.Message.t()], usage: ALLM.Usage.t() }
Functions
@spec apply_event(state(), ALLM.Event.t()) :: state()
Fold a single ALLM.Event into the collector state. Total over the 16-tag
closed union; unknown tags or malformed payloads are no-ops (state
unchanged).
See the module doc's "Fold semantics — Phase 5 subset" section for the per-tag state transitions.
Examples
iex> s = ALLM.StreamCollector.new()
iex> s = ALLM.StreamCollector.apply_event(s, {:text_delta, %{id: nil, delta: "hel"}})
iex> s = ALLM.StreamCollector.apply_event(s, {:text_delta, %{id: nil, delta: "lo"}})
iex> s.current_text
"hello"
@spec new() :: state()
Build a thread-less collector. Equivalent to new(nil).
Use this when reducing ALLM.stream_generate/3's stream into a
%Response{} via to_response/1. Calling to_step_result/1 or
to_chat_result/1 on a thread-less collector raises ArgumentError.
Examples
iex> s = ALLM.StreamCollector.new()
iex> s.thread
nil
iex> s.current_text
""
@spec new(ALLM.Thread.t() | nil) :: state()
Build a collector either thread-less (nil) or seeded with a
%ALLM.Thread{}. The thread is required by to_step_result/1 and
to_chat_result/1; use new/0 or new(nil) when only to_response/1
will be consumed.
Examples
iex> thread = ALLM.Thread.new()
iex> s = ALLM.StreamCollector.new(thread)
iex> s.thread
%ALLM.Thread{messages: [], metadata: %{}}
iex> ALLM.StreamCollector.new(nil).thread
nil
@spec to_chat_result(state()) :: ALLM.ChatResult.t()
Build a %ALLM.ChatResult{} from the collector state.
Two branches (Phase 7 Non-obvious Decision #7):
- Stored. When
state.chat_resultis set (from a:chat_completedfold), return it verbatim — even whenstate.threadisnil. The stored result is authoritative; the orchestrator already constructed the canonical ChatResult. - Fallback (computed). When
state.chat_resultisnilandstate.threadis set, build a%ChatResult{}from collector state.:halted_reasonis:errorwhenstate.error != nil(mid-stream adapter error); otherwise ALWAYS:cancelled(the consumer halted the stream early; non-emptystate.stepsdoes NOT promote to:completed).:final_responseis the last step's response whenstate.steps != [], orto_response(state)when no steps were observed.
Raises ArgumentError when both state.chat_result and state.thread
are nil.
@spec to_response(state()) :: ALLM.Response.t()
Build a %ALLM.Response{} from the collector state.
Works on any collector (thread-less or not). A mid-stream error surfaces
as finish_reason: :error with the error struct under metadata.error;
this function never returns {:error, _}.
Examples
iex> s = ALLM.StreamCollector.new()
iex> s = ALLM.StreamCollector.apply_event(s, {:text_delta, %{id: nil, delta: "hi"}})
iex> s = ALLM.StreamCollector.apply_event(s, {:message_completed, %{message: %ALLM.Message{role: :assistant, content: "hi"}, finish_reason: :stop}})
iex> resp = ALLM.StreamCollector.to_response(s)
iex> {resp.output_text, resp.finish_reason}
{"hi", :stop}
@spec to_step_result(state()) :: ALLM.StepResult.t()
Build a %ALLM.StepResult{} from the collector state. Requires a non-nil
thread (from new/1 with a %Thread{}); raises ArgumentError otherwise.
:done? is true when state.halt != nil (a halt event was folded in)
or when finish_reason in [:stop, :length, :content_filter, :error];
false otherwise (for :tool_calls or nil with no halt).
:tool_results is populated from the :tool_result_encoded fold clause
AND from the :tool_halt / :ask_user_requested sentinel appends
(Phase 7.6 cleanup B1).
:metadata merges halt metadata when state.halt != nil (see Phase 6
design §StreamCollector extension):
{:halt, reason, id, result}→%{halted_reason: reason, halt_tool_call_id: id, halt_result: result}.{:ask_user, :ask_user, id, q, o}→%{halted_reason: :ask_user, pending_tool_call_id: id, pending_question: q, ask_user_opts: o}.