ALLM.StreamRunner (allm v0.3.0)

Copy Markdown View Source

Internal — use ALLM.stream_generate/3 instead. See spec §17.

Validates the request, resolves model/tools/params via ALLM.Engine, dispatches to the engine adapter's stream/2, and applies per-§19 post-filters and the :on_event observer.

Orchestration opts are stripped

Orchestration opts (Phase 6 and Phase 7 consumers) — :mode, :max_turns, :halt_when — are deny-listed here and NOT forwarded to the adapter. stream_generate/3 is single-request, so they would be no-ops or (in the case of :halt_when) a Protocol.UndefinedError trap at the Jason-encode boundary of real providers. Logger.debug/1 fires for each stripped key so power users can see the drop during development.

No double-wrapped Stream.resource/3

The adapter owns the streaming resource and its cleanup hook; this module only composes Stream.each/2 (for :on_event) and Stream.filter/2 (for the three emit/include filters) on top. Both operators propagate {:halt, _} upstream, which is what preserves the halt-safety contract from Phase 4.

:on_event failure mode

:on_event is invoked lazily by Stream.each/2 inside the consumer's reducing process — an exception raised by the callback surfaces there, not at the stream_generate/3 call site. This module does not wrap the callback in try/rescue; callers who need resilience wrap their own callback.

Summary

Functions

Dispatch a streaming request. Validates, resolves params, forwards to engine.adapter.stream/2, and wires the per-§19 post-processing pipeline.

Functions

run(engine, request, opts \\ [])

Dispatch a streaming request. Validates, resolves params, forwards to engine.adapter.stream/2, and wires the per-§19 post-processing pipeline.

Returns {:ok, stream} on a successfully-opened stream (lazy — no event fires until the caller reduces) or {:error, struct} on a synchronous pre-flight failure.

Examples

iex> engine = ALLM.Engine.new(
...>   adapter: ALLM.Providers.Fake,
...>   adapter_opts: [script: [{:text, "hi"}, {:finish, :stop}]]
...> )
iex> req = ALLM.request([ALLM.user("say hi")])
iex> {:ok, stream} = ALLM.StreamRunner.run(engine, req)
iex> events = Enum.to_list(stream)
iex> Enum.any?(events, &match?({:message_completed, _}, &1))
true