Folds an ALLM.Chat.stream/3 (or ALLM.Chat.stream_step/3) event
stream into both an updated %ALLM.Session{} and a terminal
%ALLM.ChatResult{} (or %ALLM.StepResult{}) in one pass. See spec
§13.2 and steering/PHASE_8_DESIGN.md Decision #15.
Layer D — Phase 8 Batch 1 ships the reducer alongside the
non-streaming Session API; the streaming Session entry points
(stream_start/3, stream_reply/4, stream_step/3) land in Batch 2.
Modes
Two dispatch modes — set at construction via new/2's :mode opt:
:chat(default) — used withALLM.Session.stream_start/3,stream_reply/4.finalize/1returns{updated_session, %ChatResult{}}. When the consumer halts before a:chat_completedevent arrives, a:cancelled%ChatResult{}is built from the partial collector state.:step— used withALLM.Session.stream_step/3.finalize/1returns{updated_session, %StepResult{}}if a:step_completedevent was observed; otherwise a:cancelled%ChatResult{}is built from the empty-step partial state (StepResult cannot represent a stream that was cancelled before any step completed).
Examples
iex> session = ALLM.Session.new()
iex> reducer = ALLM.Session.StreamReducer.new(session)
iex> reducer.session == session
true
iex> reducer.mode
:chat
iex> session = ALLM.Session.new()
iex> reducer = ALLM.Session.StreamReducer.new(session, mode: :step)
iex> reducer.mode
:step
Summary
Functions
Fold one ALLM.Event value into the reducer. Total over the closed
16-tag event union; unknown tags or malformed payloads are no-ops
(delegated to StreamCollector.apply_event/2's catch-all).
Project the folded collector state onto an updated %Session{} and a
terminal result tuple.
Build a reducer wrapping the originating %ALLM.Session{} plus a fresh
%ALLM.StreamCollector{} seeded with session.thread.
Types
@type mode() :: :chat | :step
@type t() :: %ALLM.Session.StreamReducer{ collector: ALLM.StreamCollector.state(), mode: mode(), session: ALLM.Session.t() }
Functions
@spec apply_event(t(), ALLM.Event.t()) :: t()
Fold one ALLM.Event value into the reducer. Total over the closed
16-tag event union; unknown tags or malformed payloads are no-ops
(delegated to StreamCollector.apply_event/2's catch-all).
Does NOT mutate state.session — the originating session is preserved
through every fold and only updated at finalize/1.
Examples
iex> session = ALLM.Session.new(thread: ALLM.Thread.from_messages([ALLM.user("hi")]))
iex> reducer = ALLM.Session.StreamReducer.new(session)
iex> updated = ALLM.Session.StreamReducer.apply_event(reducer, {:text_delta, %{delta: "hi"}})
iex> updated.collector.current_text
"hi"
iex> updated.session == session
true
@spec finalize(t()) :: {ALLM.Session.t(), ALLM.ChatResult.t() | ALLM.StepResult.t()}
Project the folded collector state onto an updated %Session{} and a
terminal result tuple.
Dispatches on state.mode:
:chat+ observed:chat_completed→{Session.apply_chat_result(session, cr), cr}.:chat+ no:chat_completed(consumer halted) → usesStreamCollector.to_chat_result/1's:cancelledfallback to build a%ChatResult{}, then projects.:step+ exactly one observed:step_completed→{Session.apply_step_result(session, sr), sr}.:step+ no:step_completed(consumer halted before any step) →{session, %ChatResult{halted_reason: :cancelled, ...}}. The result tuple's second element is a%ChatResult{}, not a%StepResult{}, because no step was observed to project.
Idempotent — calling twice with the same state returns equal tuples.
Examples
iex> session = ALLM.Session.new(thread: ALLM.Thread.from_messages([ALLM.user("hi")]))
iex> reducer = ALLM.Session.StreamReducer.new(session)
iex> {s, r} = ALLM.Session.StreamReducer.finalize(reducer)
iex> s.status
:completed
iex> r.halted_reason
:cancelled
@spec new( ALLM.Session.t(), keyword() ) :: t()
Build a reducer wrapping the originating %ALLM.Session{} plus a fresh
%ALLM.StreamCollector{} seeded with session.thread.
Validates opts[:mode] against [:chat, :step]; unknown values raise
ArgumentError at construction time.
Options
:mode—:chat(default) or:step. Selectsfinalize/1's dispatch shape per Decision #15.
Examples
iex> session = ALLM.Session.new(thread: ALLM.Thread.from_messages([ALLM.user("hi")]))
iex> reducer = ALLM.Session.StreamReducer.new(session)
iex> reducer.session.thread == session.thread
true
iex> reducer.collector.thread == session.thread
true