AI.Agent.Composite behaviour (fnord v0.9.17)
View SourceBehaviour and execution engine for composite agents - agents that orchestrate work across multiple completion turns, optionally with tool use, structured output, and sub-agent delegation.
Steps as state
A composite agent's work is defined as a dequeue of steps. Each step is either a completion (a turn in this agent's conversation) or a delegation (spawning a sub-agent). Steps can be grouped in a list for parallel execution.
Example step queues:
# Reviewer: fixed pipeline with parallel specialist fan-out
[formulate, [pedantic, acceptance, state_flow], incorporate]
# Coder planner: fixed sequential pipeline
[research, visualize, plan]
# Coder orchestrator: dynamic - validate can push tasks back
[task_1, task_2, task_3, validate]Step types
A completion step runs a prompt against this agent's conversation, accumulating
the response into the message history:
AI.Agent.Composite.completion(:research, "Investigate the code...",
response_format: %{...}, keep_prompt?: false)A delegate step spawns a sub-agent. The sub-agent runs its own independent
conversation; its response is injected into the parent's message history as a
user message with a header identifying the source:
AI.Agent.Composite.delegate(:pedantic, AI.Agent.Review.Pedantic,
fn state -> %{prompt: ..., scope: state.request} end)Parallel execution
When a step in the queue is a list, all steps in that list run concurrently. Results are collected and injected into the conversation in list order before the next sequential step begins.
Lifecycle
The execution engine calls implementation callbacks at each stage:
init/1- Build the initial state and step queue.- Pop the next item from the step queue.
on_step_start/2- Pre-execution hook (logging, UI).- Execute the step (completion or delegation).
on_step_complete/2- Post-execution hook (parse response, update state).get_next_steps/2- Return steps to prepend to the queue, enabling dynamic control flow (retry, task generation, validation loops).- Go to 2.
Summary
Callbacks
Called after on_step_complete/2. Returns a list of steps to prepend to the
front of the queue. Return [] to continue with the existing queue.
Initialize the composite agent from the caller-provided args map (which
includes :agent injected by AI.Agent.get_response/2). Must return a
fully populated %AI.Agent.Composite{} with the initial step queue.
Called when a step fails. state.error contains the error. Return one of
Called after a step completes successfully. The step's response is in
state.response and has been appended to state.messages. Use this to
parse structured output and update state.internal.
Called immediately before a step executes. Typically used for UI reporting
(UI.report_from/2). Must return the (possibly modified) state.
Functions
Append steps to the end of the queue.
Creates a completion step - a turn in this agent's conversation.
Creates a delegate step - spawns a sub-agent and injects its response into
the parent conversation. The args_builder function receives the current
state and must return the args map passed to the sub-agent's get_response/1.
Retrieves a value from the internal map. key may be a single atom or a
list of atoms (nested path per get_in/2 semantics). Returns
{:error, :not_found} when any key in the path is missing.
Prepend steps to the front of the queue (next to execute).
Sets a value in the internal map. key may be a single atom or a list of
atoms (nested path per put_in/3 semantics). When passing a list, all
intermediate keys must already exist.
Runs the composite agent to completion. Calls init/1 on the implementation
module, then processes steps from the queue until it's empty or an
unrecoverable error occurs.
Types
@type completion_step() :: %{ type: :completion, name: step_name(), prompt: binary(), model: AI.Model.t() | nil, toolbox: AI.Tools.toolbox() | nil, response_format: map() | nil, keep_prompt?: boolean() }
@type step() :: completion_step() | delegate_step()
@type step_name() :: atom()
@type t() :: %AI.Agent.Composite{ agent: AI.Agent.t(), error: any(), internal: map(), messages: AI.Util.msg_list(), model: AI.Model.t(), request: binary(), response: binary() | nil, steps: step_queue(), toolbox: AI.Tools.toolbox() }
Callbacks
Called after on_step_complete/2. Returns a list of steps to prepend to the
front of the queue. Return [] to continue with the existing queue.
This is the primary mechanism for dynamic control flow:
- Retry: return
[the_same_step] - Task generation: return
[task_1, task_2, ..., validate] - Conditional branching: inspect
state.internaland return different steps
For the reviewer, this always returns [] since the pipeline is fixed.
For the coder, the plan step returns task steps, and the validate step can
return more task steps on failure.
Initialize the composite agent from the caller-provided args map (which
includes :agent injected by AI.Agent.get_response/2). Must return a
fully populated %AI.Agent.Composite{} with the initial step queue.
@callback on_error(step :: step(), error :: any(), state :: t()) :: {:retry, t()} | {:skip, t()} | {:halt, t()}
Called when a step fails. state.error contains the error. Return one of:
{:retry, state}- re-execute the same step{:skip, state}- clear the error and continue to the next step{:halt, state}- stop execution with the error
Called after a step completes successfully. The step's response is in
state.response and has been appended to state.messages. Use this to
parse structured output and update state.internal.
Must return the updated state.
Called immediately before a step executes. Typically used for UI reporting
(UI.report_from/2). Must return the (possibly modified) state.
Functions
Append steps to the end of the queue.
@spec completion(step_name(), binary(), keyword()) :: completion_step()
Creates a completion step - a turn in this agent's conversation.
Options:
:model- override the agent's default model for this step:toolbox- override the agent's default toolbox for this step:response_format- JSON schema to constrain output:keep_prompt?- if true, the prompt remains in message history (default false)
@spec delegate(step_name(), module(), (t() -> map())) :: delegate_step()
Creates a delegate step - spawns a sub-agent and injects its response into
the parent conversation. The args_builder function receives the current
state and must return the args map passed to the sub-agent's get_response/1.
Retrieves a value from the internal map. key may be a single atom or a
list of atoms (nested path per get_in/2 semantics). Returns
{:error, :not_found} when any key in the path is missing.
Prepend steps to the front of the queue (next to execute).
Sets a value in the internal map. key may be a single atom or a list of
atoms (nested path per put_in/3 semantics). When passing a list, all
intermediate keys must already exist.
Runs the composite agent to completion. Calls init/1 on the implementation
module, then processes steps from the queue until it's empty or an
unrecoverable error occurs.
Returns {:ok, final_response} or {:error, reason}.