Internal — use ALLM.step/3 / ALLM.stream_step/3 / ALLM.chat/3 /
ALLM.stream/3 instead. See spec §17.
Layer C — stateless single-turn step orchestrator. Phase 6 ships step/3
and stream_step/3; Phase 7 will add run/3 and stream/3 (multi-turn)
on this same module.
Step equivalence (spec §3 + Phase 6 design Non-obvious Decision #9)
step/3 is implemented as a reducer over stream_step/3's event stream
via ALLM.StreamCollector. The two paths must produce identical
%ALLM.StepResult{} values modulo a tool_call_id sort on
:tool_results (parallel tool execution completes in non-deterministic
order; the streaming path emits in completion order while the
non-streaming path sorts by input index). See
steering/PHASE_6_DESIGN.md Non-obvious Decision #9 for the full
equivalence contract. The Phase 6 property test in
test/allm/step_equivalence_test.exs (Phase 6.4) exercises this.
Stream composition (Non-obvious Decision #1)
stream_step/3 wraps ONE outer Stream.resource/3 driving a three-phase
state machine:
- Phase A (
:phase_a) — drives the adapter stream via itsEnumerable.reduce/3continuation. Eachnext_funpulls ONE event, folds it into a%StreamCollector{}and emits it downstream. Transitions to Phase B when the adapter stream exhausts; never transitions on event content (:finish_reason: :tool_callsin an intermediate event does NOT trigger the transition — trailing:raw_chunkevents after:message_completedare still consumed). - Phase B (
:phase_b) — drivesALLM.ToolRunner.stream_tool_calls/3via its reducer continuation. Eachnext_funpulls the next event trio from one completed tool and emits it downstream. When a handler halts oron_tool_error: :haltfires, the phase continues pulling (sibling drain — see Phase 6 design Non-obvious Decision #1). - Phase C (
:phase_c) — emits exactly one:step_completedevent with the final%Response{}and final%Thread{}(input + augmented assistant + tool-role messages).
The outer after_fun pattern-matches on the state tuple and halts the
active sub-resource (adapter stream in Phase A, tool-execution stream in
Phase B) via Enumerable.reduce(acc, {:halt, :consumer_halt}, _) — this
triggers the sub-resource's own cleanup exactly once. Phase C has no
sub-resource to halt. This is ONE Stream.resource/3, not two; it drives
sub-streams by their reducer continuations rather than wrapping them.
Event sequence (Invariant 6)
Events are emitted in this order:
- All adapter events (pass-through).
- Zero-to-N tool-execution event groups (for
mode: :auto+:finish_reason: :tool_calls). Each group is, per tool::tool_execution_started→:tool_execution_completed→ one of:tool_result_encoded/:ask_user_requested/:tool_halt. Groups interleave across tools perTask.async_stream/5completion ordering; within each group the three events are emitted together. - Exactly ONE terminal
:step_completedevent.
No new :message_completed is synthesised after tool execution
(Non-obvious Decision #12).
Assistant message construction (Non-obvious Decision #10)
The augmented assistant message is built from response.output_text
(collector-authoritative — the accumulated :text_delta deltas or
:text_completed authoritative text), NOT from
response.message.content (which may be adapter-specific
normalised/trimmed text). metadata.finish_reason is always populated;
metadata.tool_calls is populated only when non-empty.
Ask-user semantics (Non-obvious Decision #6)
Phase 6 is single-turn — step/3's thread does NOT contain an extra
:assistant-role message with metadata: %{ask_user: true} for an
ask-user handler return. Only :ask_user_requested is emitted and
StepResult.metadata.pending_question / :pending_tool_call_id /
:ask_user_opts are populated. Phase 7's chat/3 appends the question
to the thread as an assistant message at the turn boundary.
Summary
Functions
Run a multi-turn chat loop and return a %ALLM.ChatResult{}.
Execute a single step (one adapter call plus any auto-executed tool
calls) and return a %ALLM.StepResult{}.
Stream a multi-turn chat loop and return a lazy stream of ALLM.Event
values terminating in exactly one :chat_completed event.
Execute a single step and return a lazy stream of ALLM.Event values.
Types
@type chat_opts() :: keyword()
Options accepted by run/3 (and stream/3 in Phase 7.4).
:max_turns—pos_integer(). Precedence: call opts >engine.paramsApplication.get_env(:allm, :max_turns)> library default8. Validated at entry; raisesArgumentErrorfor non-pos_integer.:halt_when—(StepResult.t() -> boolean()). Called AFTER thread mutation per turn; exceptions propagate to the caller.- Plus every
step_opts/0key (:mode,:tool_timeout,:on_tool_error, etc.).
@type step_opts() :: keyword()
Options accepted by step/3 and stream_step/3.
:mode—:auto(default) executes tool calls;:manualreturns them for the caller to submit results.:tool_timeout— milliseconds per tool (default 30_000).:on_tool_error—:continue(default) or:halt.:tool_executor,:tool_result_encoder— module overrides.- Phase 5 pass-through opts:
:emit_text_deltas,:emit_tool_deltas,:include_raw_chunks,:on_event. - Phase 2 pass-through opts:
:model,:adapter_opts, and any adapter-specific keys.
Functions
@spec run(ALLM.Engine.t(), ALLM.Thread.t() | [ALLM.Message.t()], chat_opts()) :: {:ok, ALLM.ChatResult.t()} | {:error, ALLM.Error.EngineError.t() | ALLM.Error.AdapterError.t() | ALLM.Error.ValidationError.t()}
Run a multi-turn chat loop and return a %ALLM.ChatResult{}.
Composes step/3 calls: each step's thread becomes the next step's
input thread. Halts on the first matching terminal condition (see
terminal_condition/4 source for the seven-entry total order).
Halt reasons
| Reason | Fires when |
|---|---|
:completed | Adapter finish_reason ∈ {:stop, :length, :content_filter} |
:error | Adapter finish_reason: :error (mid-stream error folds into the response) |
:max_turns | step_index + 1 >= max_turns after a step that didn't otherwise halt |
:halt_when | halt_when.(step_result) returns true |
:ask_user | Handler returned {:ask_user, _} or {:ask_user, _, _} |
:tool_error | on_tool_error: :halt fired, or fun form returned :halt / raised |
:manual_tool_calls | mode: :manual and step surfaces tool calls |
| atom() (user) | Handler returned {:halt, reason, result} |
Adapter pre-flight errors surface as {:error, struct} from the FIRST
step's step/3 call. Mid-loop adapter errors fold into the step's
response and surface as halted_reason: :error on the ChatResult.
structured_finalize semantics (Phase 10.4 — see spec §5.4)
When called with opts[:structured_finalize] == true AND
opts[:response_format] != nil, run/3 runs a two-pass orchestration
per design Decision #7:
- Pass 1 runs the tool loop with
response_formatcleared (tools preserved). Halts naturally per the table above. - Pass 2 fires only when pass 1 halted on
:completed | :max_turns | :halt_when. Other halts skip pass 2; the pass-1 result is returned withmetadata.structured_finalize.pass_1_halted == <reason>. - Pass 2 issues a single tools-disabled adapter call carrying the
original
response_format, after appending a user-nudge message to the thread (override viaopts[:structured_finalize_nudge]>Application.get_env(:allm, :structured_finalize_nudge)> library default"Now provide your final structured response."; empty-string nudge skips the append). - The merged
%ChatResult{}carries:stepsfrom BOTH passes,:final_responsefrom pass 2,:halted_reasonfrom pass 2,:threadfrom pass 2, andmetadata.structured_finalize.pass_1_halted.
Per Invariant #4: pass 1 consumes the max_turns budget; pass 2's
single call does NOT decrement it.
Examples
iex> engine = ALLM.Engine.new(
...> adapter: ALLM.Providers.Fake,
...> adapter_opts: [
...> scripts: [
...> [{:tool_call, id: "c0", name: "echo", arguments: %{"x" => 1}},
...> {:finish, :tool_calls}],
...> [{:text, "done"}, {:finish, :stop}]
...> ]
...> ],
...> tools: [ALLM.tool(
...> name: "echo",
...> description: "",
...> schema: %{},
...> handler: fn args -> {:ok, args} end
...> )]
...> )
iex> thread = ALLM.Thread.from_messages([ALLM.user("echo please")])
iex> {:ok, %ALLM.ChatResult{} = r} = ALLM.Chat.run(engine, thread)
iex> r.halted_reason
:completed
iex> length(r.steps)
2
@spec step(ALLM.Engine.t(), ALLM.Thread.t() | [ALLM.Message.t()], step_opts()) :: {:ok, ALLM.StepResult.t()} | {:error, ALLM.Error.EngineError.t() | ALLM.Error.AdapterError.t() | ALLM.Error.ValidationError.t()}
Execute a single step (one adapter call plus any auto-executed tool
calls) and return a %ALLM.StepResult{}.
Normalises thread_or_messages — a list of %Message{} is wrapped via
ALLM.Thread.from_messages/1. Validates the thread via
ALLM.Validate.thread/1 before the adapter call. Dispatches to
ALLM.Runner.run/3 for the adapter round-trip, then branches on
:mode and response.finish_reason:
mode: :manualwithfinish_reason: :tool_calls— returns the tool calls surfaced onresponse.tool_calls;tool_results: [],done?: false,metadata.mode: :manual. Handler is NOT invoked.mode: :autowithfinish_reason: :tool_calls— dispatches toALLM.ToolRunner.run_tool_calls/3, appends tool-role messages to the thread, and returns the composed step result.- Anything else (
:stop,:length,:content_filter,:error) —done?: true,tool_results: [].
Error reason table
| Error | Recovery |
|---|---|
%EngineError{reason: :missing_adapter} | Construct engine with :adapter. |
%EngineError{reason: :missing_stream_adapter} | Adapter must implement ALLM.StreamAdapter. |
%EngineError{reason: :unknown_tool, metadata: %{tool_name: name}} | Register the tool or filter the adapter's emitted tool calls. |
%ValidationError{reason: :invalid_thread} | Fix the thread (e.g. missing tool_call_id on a :tool message). |
%ValidationError{reason: :invalid_request} | Fix the request shape. |
%AdapterError{reason: _} | Adapter pre-flight error. |
Examples
iex> engine = ALLM.Engine.new(
...> adapter: ALLM.Providers.Fake,
...> adapter_opts: [
...> script: [
...> {:tool_call, id: "c0", name: "echo", arguments: %{"x" => 1}},
...> {:finish, :tool_calls}
...> ]
...> ],
...> tools: [ALLM.tool(
...> name: "echo",
...> description: "",
...> schema: %{},
...> handler: fn args -> {:ok, args} end
...> )]
...> )
iex> thread = ALLM.Thread.from_messages([ALLM.user("echo please")])
iex> {:ok, %ALLM.StepResult{} = sr} = ALLM.Chat.step(engine, thread)
iex> sr.done?
false
iex> length(sr.tool_results)
1
@spec stream(ALLM.Engine.t(), ALLM.Thread.t() | [ALLM.Message.t()], chat_opts()) :: {:ok, Enumerable.t()} | {:error, ALLM.Error.EngineError.t() | ALLM.Error.AdapterError.t() | ALLM.Error.ValidationError.t()}
Stream a multi-turn chat loop and return a lazy stream of ALLM.Event
values terminating in exactly one :chat_completed event.
Composes stream_step/3 sub-streams sequentially: the outer
Stream.resource/3 drives the current step's reducer one event at a
time (mirroring Phase 6's stream_step/3 continuation idiom one layer
up). When a step completes, terminal_condition/5 decides whether to
start a new step (with the augmented thread) or transition to the
terminal :chat_completed emission.
Multi-turn stream composition
Two-phase state machine (see Phase 7 design Non-obvious Decision #1):
- Phase S (
:step) — drives the currentstream_step/3enumerable via its reducer continuation. Eachnext_funpulls one event, folds it into the outerStreamCollector, and emits it. On:step_completed, computes a%StepResult{}from the PRE-fold collector state, folds the event, then invokesterminal_condition/5. On:continue, starts the next step. On{:halt, reason, _}, builds the final%ChatResult{}and transitions to Phase F. - Phase F (
:final) — emits exactly one{:chat_completed, %{result: chat_result}}event and halts.
Cleanup chain
Chat.stream/3 after_fun
→ halt step_cont
→ Chat.stream_step/3 after_fun
→ halt adapter_cont OR tool_cont (whichever is active)Consumer halt produces NO :chat_completed event (per spec §30
cancellation contract). Callers needing a final %ChatResult{} for a
cancelled stream collect events and call
ALLM.StreamCollector.to_chat_result/1 on the partial state.
Ask-user thread asymmetry
When a step's handler returns {:ask_user, _}, the streamed
:step_completed.thread does NOT include the assistant question
message — only the :chat_completed.result.thread does (Phase 7
Invariant 8). Consumers persisting thread state across turns should
read ChatResult.thread, not :step_completed.thread.
Examples
iex> engine = ALLM.Engine.new(
...> adapter: ALLM.Providers.Fake,
...> adapter_opts: [
...> scripts: [
...> [{:tool_call, id: "c0", name: "echo", arguments: %{"x" => 1}},
...> {:finish, :tool_calls}],
...> [{:text, "done"}, {:finish, :stop}]
...> ]
...> ],
...> tools: [ALLM.tool(
...> name: "echo",
...> description: "",
...> schema: %{},
...> handler: fn args -> {:ok, args} end
...> )]
...> )
iex> thread = ALLM.Thread.from_messages([ALLM.user("echo please")])
iex> {:ok, stream} = ALLM.Chat.stream(engine, thread)
iex> events = Enum.to_list(stream)
iex> Enum.count(events, &match?({:chat_completed, _}, &1))
1
@spec stream_step(ALLM.Engine.t(), ALLM.Thread.t() | [ALLM.Message.t()], step_opts()) :: {:ok, Enumerable.t()} | {:error, ALLM.Error.EngineError.t() | ALLM.Error.AdapterError.t() | ALLM.Error.ValidationError.t()}
Execute a single step and return a lazy stream of ALLM.Event values.
The stream is open — no events fire until the caller reduces. Events are
emitted in this order: all adapter events (pass-through from
stream_generate/3), then zero-to-N tool-execution event groups (one
per tool: :tool_execution_started → :tool_execution_completed →
:tool_result_encoded / :ask_user_requested / :tool_halt), then
exactly one terminal :step_completed event.
Consumer halt (via Enum.take/2, Stream.take_while/2, etc.) propagates
to whichever phase is currently active — the adapter stream in Phase A
or the tool-execution stream in Phase B — triggering that sub-resource's
own cleanup exactly once.
Event sequence
See the module doc's "Event sequence" section. No new :message_completed
is synthesised after tool execution (Non-obvious Decision #12).
Unknown tools (Phase B pre-flight)
When the adapter requests a tool that is not registered on the engine,
stream_step/3 still returns {:ok, stream} — the error does NOT
surface on the outer tuple. Instead, after the adapter phase completes,
the stream emits a single {:error, %ALLM.EngineError{reason: :unknown_tool}} event followed by the terminal :step_completed
event. Consumers that need to short-circuit on unknown tools should
pattern-match on {:error, _} elements during reduction. This differs
from the non-streaming step/3 which returns {:error, %EngineError{}}
on the outer tuple; the asymmetry exists because once a stream has been
constructed the consumer has already committed to reducing it, and
late-surfacing the error as a stream element keeps the open-stream
contract intact. See Non-obvious Decision #1 for the underlying
three-phase state machine.
Examples
iex> engine = ALLM.Engine.new(
...> adapter: ALLM.Providers.Fake,
...> adapter_opts: [
...> script: [
...> {:tool_call, id: "c0", name: "echo", arguments: %{"x" => 1}},
...> {:finish, :tool_calls}
...> ]
...> ],
...> tools: [ALLM.tool(
...> name: "echo",
...> description: "",
...> schema: %{},
...> handler: fn args -> {:ok, args} end
...> )]
...> )
iex> thread = ALLM.Thread.from_messages([ALLM.user("echo please")])
iex> {:ok, stream} = ALLM.Chat.stream_step(engine, thread)
iex> events = Enum.to_list(stream)
iex> Enum.any?(events, &match?({:step_completed, _}, &1))
true