ALLM.Session.StreamReducer (allm v0.3.0)

Copy Markdown View Source

Folds an ALLM.Chat.stream/3 (or ALLM.Chat.stream_step/3) event stream into both an updated %ALLM.Session{} and a terminal %ALLM.ChatResult{} (or %ALLM.StepResult{}) in one pass. See spec §13.2 and steering/PHASE_8_DESIGN.md Decision #15.

Layer D — Phase 8 Batch 1 ships the reducer alongside the non-streaming Session API; the streaming Session entry points (stream_start/3, stream_reply/4, stream_step/3) land in Batch 2.

Modes

Two dispatch modes — set at construction via new/2's :mode opt:

  • :chat (default) — used with ALLM.Session.stream_start/3, stream_reply/4. finalize/1 returns {updated_session, %ChatResult{}}. When the consumer halts before a :chat_completed event arrives, a :cancelled %ChatResult{} is built from the partial collector state.
  • :step — used with ALLM.Session.stream_step/3. finalize/1 returns {updated_session, %StepResult{}} if a :step_completed event was observed; otherwise a :cancelled %ChatResult{} is built from the empty-step partial state (StepResult cannot represent a stream that was cancelled before any step completed).

Examples

iex> session = ALLM.Session.new()
iex> reducer = ALLM.Session.StreamReducer.new(session)
iex> reducer.session == session
true
iex> reducer.mode
:chat

iex> session = ALLM.Session.new()
iex> reducer = ALLM.Session.StreamReducer.new(session, mode: :step)
iex> reducer.mode
:step

Summary

Functions

Fold one ALLM.Event value into the reducer. Total over the closed 16-tag event union; unknown tags or malformed payloads are no-ops (delegated to StreamCollector.apply_event/2's catch-all).

Project the folded collector state onto an updated %Session{} and a terminal result tuple.

Build a reducer wrapping the originating %ALLM.Session{} plus a fresh %ALLM.StreamCollector{} seeded with session.thread.

Types

mode()

@type mode() :: :chat | :step

t()

@type t() :: %ALLM.Session.StreamReducer{
  collector: ALLM.StreamCollector.state(),
  mode: mode(),
  session: ALLM.Session.t()
}

Functions

apply_event(state, event)

@spec apply_event(t(), ALLM.Event.t()) :: t()

Fold one ALLM.Event value into the reducer. Total over the closed 16-tag event union; unknown tags or malformed payloads are no-ops (delegated to StreamCollector.apply_event/2's catch-all).

Does NOT mutate state.session — the originating session is preserved through every fold and only updated at finalize/1.

Examples

iex> session = ALLM.Session.new(thread: ALLM.Thread.from_messages([ALLM.user("hi")]))
iex> reducer = ALLM.Session.StreamReducer.new(session)
iex> updated = ALLM.Session.StreamReducer.apply_event(reducer, {:text_delta, %{delta: "hi"}})
iex> updated.collector.current_text
"hi"
iex> updated.session == session
true

finalize(stream_reducer)

@spec finalize(t()) :: {ALLM.Session.t(), ALLM.ChatResult.t() | ALLM.StepResult.t()}

Project the folded collector state onto an updated %Session{} and a terminal result tuple.

Dispatches on state.mode:

  • :chat + observed :chat_completed{Session.apply_chat_result(session, cr), cr}.
  • :chat + no :chat_completed (consumer halted) → uses StreamCollector.to_chat_result/1's :cancelled fallback to build a %ChatResult{}, then projects.
  • :step + exactly one observed :step_completed{Session.apply_step_result(session, sr), sr}.
  • :step + no :step_completed (consumer halted before any step) → {session, %ChatResult{halted_reason: :cancelled, ...}}. The result tuple's second element is a %ChatResult{}, not a %StepResult{}, because no step was observed to project.

Idempotent — calling twice with the same state returns equal tuples.

Examples

iex> session = ALLM.Session.new(thread: ALLM.Thread.from_messages([ALLM.user("hi")]))
iex> reducer = ALLM.Session.StreamReducer.new(session)
iex> {s, r} = ALLM.Session.StreamReducer.finalize(reducer)
iex> s.status
:completed
iex> r.halted_reason
:cancelled

new(session, opts \\ [])

@spec new(
  ALLM.Session.t(),
  keyword()
) :: t()

Build a reducer wrapping the originating %ALLM.Session{} plus a fresh %ALLM.StreamCollector{} seeded with session.thread.

Validates opts[:mode] against [:chat, :step]; unknown values raise ArgumentError at construction time.

Options

  • :mode:chat (default) or :step. Selects finalize/1's dispatch shape per Decision #15.

Examples

iex> session = ALLM.Session.new(thread: ALLM.Thread.from_messages([ALLM.user("hi")]))
iex> reducer = ALLM.Session.StreamReducer.new(session)
iex> reducer.session.thread == session.thread
true
iex> reducer.collector.thread == session.thread
true