ALLM.StreamCollector (allm v0.3.0)

Copy Markdown View Source

Reduce a stream of ALLM.Event values into a collected %ALLM.Response{}, %ALLM.StepResult{}, or %ALLM.ChatResult{}. See spec §13.1.

Layer C — stateless fold state. StreamCollector is the shared reducer that every non-streaming wrapper (ALLM.generate/3 in Phase 5, future step/3 and chat/3) builds on. Callers fold a stream with the stdlib idiom:

Enum.reduce(stream, ALLM.StreamCollector.new(), fn e, s ->
  ALLM.StreamCollector.apply_event(s, e)
end)

then extract the result with to_response/1 (thread-less) or to_step_result/1 / to_chat_result/1 (thread-backed).

Phase 5 extensions to spec §13.1

  • new/0 builds a thread-less collector used by ALLM.stream_generate/3.
  • new/1 additionally accepts nil (thread-less) alongside a %ALLM.Thread{}.
  • to_response/1 returns the accumulated %Response{} and is the canonical output for thread-less collection.

Both fit alongside the spec §13.1 signatures (new/1, apply_event/2, to_step_result/1, to_chat_result/1) without replacing them.

Phase 6 adds two new fields, :tool_results and :halt, populated by fold clauses for orchestration tags. Phase 7 adds :chat_result and populates :steps via the :step_completed fold clause.

Phase 7 extension

The :step_completed fold appends a %StepResult{} (built from the PRE-RESET collector state — see Phase 7 Non-obvious Decision #6) to :steps and resets the per-step sub-state (:current_text, :current_tool_calls, :tool_call_order, :tool_results, :halt, :finish_reason, :raw_finish_reason, :last_message). :error and :metadata are NOT reset — adapter mid-stream errors and accumulated metadata persist across step boundaries so that to_chat_result/1's fallback path can preserve them.

The :chat_completed fold stores the event's :result verbatim in :chat_result and sets :done? = true. to_chat_result/1 short- circuits to the stored result when present; otherwise it falls back to a computed %ChatResult{} whose halted_reason is :cancelled (consumer halted early) or :error (mid-stream adapter error).

Fold semantics — Phase 5 subset plus Phase 6 + Phase 7 extensions

apply_event/2 ships explicit clauses for the nine Phase-5 adapter-emitted tags (:message_started, :text_delta, :text_completed, :tool_call_started, :tool_call_delta, :tool_call_completed, :message_completed, :raw_chunk, :error), three Phase-6 orchestration tags (:tool_result_encoded, :tool_halt, :ask_user_requested), and two Phase-7 orchestration tags (:step_completed, :chat_completed). Every other tag — :tool_execution_started, :tool_execution_completed, plus any malformed event — falls through a single catch-all apply_event(state, _), do: state.

TagFoldRationale
:tool_result_encodedappend %Message{role: :tool, tool_call_id: id, content: content} to state.tool_results.to_step_result/1 reads :tool_results from the struct, enabling step ≡ stream_step |> collect_step.
:tool_haltwhen state.halt == nil, set state.halt = {:halt, reason, id, result} AND append the encoded sentinel %Message{role: :tool, ...} (from payload :content) to state.tool_results. Subsequent halts are no-ops (first-halt-wins).Halt metadata lives separately from :finish_reason so done?/1 can combine both signals. The sentinel append (Phase 7.6 cleanup B1) keeps tool_results aligned with the non-streaming Chat.do_step/4 path.
:ask_user_requestedwhen state.halt == nil, set state.halt = {:ask_user, :ask_user, id, q, o} AND append a <awaiting user response> sentinel message to state.tool_results. First-halt-wins.Same channel as :tool_halt; the sentinel append (Phase 7.6 cleanup B1) mirrors spec §12.3 step 1.
:step_completedappend %StepResult{} (PRE-RESET state) to state.steps; set state.thread = thread; reset per-step sub-state.Multi-step Chat.stream/3 reductions need a clean per-step boundary. :error / :metadata deliberately persist.
:chat_completedset state.chat_result = result; set state.done? = true.The chat-layer terminal event; to_chat_result/1 short-circuits to the stored value when present.

Totality guarantee

apply_event/2 never raises on a well-shaped event in the 16-tag closed union. A :raw_chunk with payload {:usage, map} allows a KeyError from struct!(ALLM.Usage, map) to propagate — that's an adapter bug (emitting keys not in %Usage{}), not a collector bug.

Adapter-side contract for :raw_chunk {:usage, _} (Usage fold)

The {:raw_chunk, {:usage, map}} fold applies struct!(ALLM.Usage, map), which raises KeyError on any map key outside %ALLM.Usage{}'s field set. Adapters emitting {:raw_chunk, {:usage, _}} events MUST pre-map their provider's wire-level usage fields (e.g., OpenAI's prompt_tokens / completion_tokens; Anthropic's input_tokens / output_tokens / cache_read_input_tokens) to %ALLM.Usage{} field names before emitting. An adapter that forwards the raw provider payload will KeyError the reducer mid-stream, destroying the accumulated collector state. See lib/allm/usage.ex for the canonical %Usage{} field set. This is a load-bearing invariant for any real provider adapter implementing ALLM.StreamAdapter.

Mid-stream errors

A terminal {:error, struct} event folds into the collector's :error field and sets :finish_reason to :error. to_response/1 still returns %Response{} — a mid-stream error is a success at the fold layer: the caller reads response.finish_reason == :error and response.metadata.error to detect it. to_response/1 never returns {:error, _}.

Summary

Types

Terminal halt signal derived from :tool_halt / :ask_user_requested events. nil until the first halt event is observed; set once and never updated (first-halt-wins — see Phase 6 design Invariant 7).

Functions

Fold a single ALLM.Event into the collector state. Total over the 16-tag closed union; unknown tags or malformed payloads are no-ops (state unchanged).

Build a thread-less collector. Equivalent to new(nil).

Build a collector either thread-less (nil) or seeded with a %ALLM.Thread{}. The thread is required by to_step_result/1 and to_chat_result/1; use new/0 or new(nil) when only to_response/1 will be consumed.

Build a %ALLM.ChatResult{} from the collector state.

Build a %ALLM.Response{} from the collector state.

Build a %ALLM.StepResult{} from the collector state. Requires a non-nil thread (from new/1 with a %Thread{}); raises ArgumentError otherwise.

Types

halt_state()

@type halt_state() ::
  nil
  | {:halt, reason :: atom(), tool_call_id :: String.t(), result :: term()}
  | {:ask_user, :ask_user, tool_call_id :: String.t(), question :: String.t(),
     opts :: keyword()}

Terminal halt signal derived from :tool_halt / :ask_user_requested events. nil until the first halt event is observed; set once and never updated (first-halt-wins — see Phase 6 design Invariant 7).

The :halt shape carries the handler's raw result term as the fourth element so merge_halt_metadata/2 can project it onto :halt_result, matching the non-streaming ToolRunner.run_tool_calls/3 halt_metadata shape (Phase 7.6 cleanup — chat-equivalence blocker B2).

state()

@type state() :: %ALLM.StreamCollector{
  chat_result: ALLM.ChatResult.t() | nil,
  current_text: String.t(),
  current_tool_calls: %{required(String.t()) => ALLM.ToolCall.t()},
  done?: boolean(),
  error:
    ALLM.Error.AdapterError.t() | ALLM.Error.StreamError.t() | term() | nil,
  finish_reason: ALLM.Response.finish_reason() | nil,
  halt: halt_state(),
  last_message: ALLM.Message.t() | nil,
  last_response: ALLM.Response.t() | nil,
  metadata: map(),
  raw_finish_reason: String.t() | nil,
  steps: [ALLM.StepResult.t()],
  thread: ALLM.Thread.t() | nil,
  tool_call_order: [String.t()],
  tool_results: [ALLM.Message.t()],
  usage: ALLM.Usage.t()
}

Functions

apply_event(state, arg2)

@spec apply_event(state(), ALLM.Event.t()) :: state()

Fold a single ALLM.Event into the collector state. Total over the 16-tag closed union; unknown tags or malformed payloads are no-ops (state unchanged).

See the module doc's "Fold semantics — Phase 5 subset" section for the per-tag state transitions.

Examples

iex> s = ALLM.StreamCollector.new()
iex> s = ALLM.StreamCollector.apply_event(s, {:text_delta, %{id: nil, delta: "hel"}})
iex> s = ALLM.StreamCollector.apply_event(s, {:text_delta, %{id: nil, delta: "lo"}})
iex> s.current_text
"hello"

new()

@spec new() :: state()

Build a thread-less collector. Equivalent to new(nil).

Use this when reducing ALLM.stream_generate/3's stream into a %Response{} via to_response/1. Calling to_step_result/1 or to_chat_result/1 on a thread-less collector raises ArgumentError.

Examples

iex> s = ALLM.StreamCollector.new()
iex> s.thread
nil
iex> s.current_text
""

new(thread)

@spec new(ALLM.Thread.t() | nil) :: state()

Build a collector either thread-less (nil) or seeded with a %ALLM.Thread{}. The thread is required by to_step_result/1 and to_chat_result/1; use new/0 or new(nil) when only to_response/1 will be consumed.

Examples

iex> thread = ALLM.Thread.new()
iex> s = ALLM.StreamCollector.new(thread)
iex> s.thread
%ALLM.Thread{messages: [], metadata: %{}}

iex> ALLM.StreamCollector.new(nil).thread
nil

to_chat_result(state)

@spec to_chat_result(state()) :: ALLM.ChatResult.t()

Build a %ALLM.ChatResult{} from the collector state.

Two branches (Phase 7 Non-obvious Decision #7):

  • Stored. When state.chat_result is set (from a :chat_completed fold), return it verbatim — even when state.thread is nil. The stored result is authoritative; the orchestrator already constructed the canonical ChatResult.
  • Fallback (computed). When state.chat_result is nil and state.thread is set, build a %ChatResult{} from collector state. :halted_reason is :error when state.error != nil (mid-stream adapter error); otherwise ALWAYS :cancelled (the consumer halted the stream early; non-empty state.steps does NOT promote to :completed). :final_response is the last step's response when state.steps != [], or to_response(state) when no steps were observed.

Raises ArgumentError when both state.chat_result and state.thread are nil.

to_response(state)

@spec to_response(state()) :: ALLM.Response.t()

Build a %ALLM.Response{} from the collector state.

Works on any collector (thread-less or not). A mid-stream error surfaces as finish_reason: :error with the error struct under metadata.error; this function never returns {:error, _}.

Examples

iex> s = ALLM.StreamCollector.new()
iex> s = ALLM.StreamCollector.apply_event(s, {:text_delta, %{id: nil, delta: "hi"}})
iex> s = ALLM.StreamCollector.apply_event(s, {:message_completed, %{message: %ALLM.Message{role: :assistant, content: "hi"}, finish_reason: :stop}})
iex> resp = ALLM.StreamCollector.to_response(s)
iex> {resp.output_text, resp.finish_reason}
{"hi", :stop}

to_step_result(state)

@spec to_step_result(state()) :: ALLM.StepResult.t()

Build a %ALLM.StepResult{} from the collector state. Requires a non-nil thread (from new/1 with a %Thread{}); raises ArgumentError otherwise.

:done? is true when state.halt != nil (a halt event was folded in) or when finish_reason in [:stop, :length, :content_filter, :error]; false otherwise (for :tool_calls or nil with no halt).

:tool_results is populated from the :tool_result_encoded fold clause AND from the :tool_halt / :ask_user_requested sentinel appends (Phase 7.6 cleanup B1).

:metadata merges halt metadata when state.halt != nil (see Phase 6 design §StreamCollector extension):

  • {:halt, reason, id, result}%{halted_reason: reason, halt_tool_call_id: id, halt_result: result}.
  • {:ask_user, :ask_user, id, q, o}%{halted_reason: :ask_user, pending_tool_call_id: id, pending_question: q, ask_user_opts: o}.