AI.Agent.Composite behaviour (fnord v0.9.17)

View Source

Behaviour and execution engine for composite agents - agents that orchestrate work across multiple completion turns, optionally with tool use, structured output, and sub-agent delegation.

Steps as state

A composite agent's work is defined as a dequeue of steps. Each step is either a completion (a turn in this agent's conversation) or a delegation (spawning a sub-agent). Steps can be grouped in a list for parallel execution.

Example step queues:

# Reviewer: fixed pipeline with parallel specialist fan-out
[formulate, [pedantic, acceptance, state_flow], incorporate]

# Coder planner: fixed sequential pipeline
[research, visualize, plan]

# Coder orchestrator: dynamic - validate can push tasks back
[task_1, task_2, task_3, validate]

Step types

A completion step runs a prompt against this agent's conversation, accumulating the response into the message history:

AI.Agent.Composite.completion(:research, "Investigate the code...",
  response_format: %{...}, keep_prompt?: false)

A delegate step spawns a sub-agent. The sub-agent runs its own independent conversation; its response is injected into the parent's message history as a user message with a header identifying the source:

AI.Agent.Composite.delegate(:pedantic, AI.Agent.Review.Pedantic,
  fn state -> %{prompt: ..., scope: state.request} end)

Parallel execution

When a step in the queue is a list, all steps in that list run concurrently. Results are collected and injected into the conversation in list order before the next sequential step begins.

Lifecycle

The execution engine calls implementation callbacks at each stage:

  1. init/1 - Build the initial state and step queue.
  2. Pop the next item from the step queue.
  3. on_step_start/2 - Pre-execution hook (logging, UI).
  4. Execute the step (completion or delegation).
  5. on_step_complete/2 - Post-execution hook (parse response, update state).
  6. get_next_steps/2 - Return steps to prepend to the queue, enabling dynamic control flow (retry, task generation, validation loops).
  7. Go to 2.

Summary

Callbacks

Called after on_step_complete/2. Returns a list of steps to prepend to the front of the queue. Return [] to continue with the existing queue.

Initialize the composite agent from the caller-provided args map (which includes :agent injected by AI.Agent.get_response/2). Must return a fully populated %AI.Agent.Composite{} with the initial step queue.

Called when a step fails. state.error contains the error. Return one of

Called after a step completes successfully. The step's response is in state.response and has been appended to state.messages. Use this to parse structured output and update state.internal.

Called immediately before a step executes. Typically used for UI reporting (UI.report_from/2). Must return the (possibly modified) state.

Functions

Append steps to the end of the queue.

Creates a completion step - a turn in this agent's conversation.

Creates a delegate step - spawns a sub-agent and injects its response into the parent conversation. The args_builder function receives the current state and must return the args map passed to the sub-agent's get_response/1.

Retrieves a value from the internal map. key may be a single atom or a list of atoms (nested path per get_in/2 semantics). Returns {:error, :not_found} when any key in the path is missing.

Prepend steps to the front of the queue (next to execute).

Sets a value in the internal map. key may be a single atom or a list of atoms (nested path per put_in/3 semantics). When passing a list, all intermediate keys must already exist.

Runs the composite agent to completion. Calls init/1 on the implementation module, then processes steps from the queue until it's empty or an unrecoverable error occurs.

Types

completion_step()

@type completion_step() :: %{
  type: :completion,
  name: step_name(),
  prompt: binary(),
  model: AI.Model.t() | nil,
  toolbox: AI.Tools.toolbox() | nil,
  response_format: map() | nil,
  keep_prompt?: boolean()
}

delegate_step()

@type delegate_step() :: %{
  type: :delegate,
  name: step_name(),
  agent: module(),
  args_builder: (t() -> map())
}

step()

@type step() :: completion_step() | delegate_step()

step_name()

@type step_name() :: atom()

step_queue()

@type step_queue() :: [step() | [step()]]

t()

@type t() :: %AI.Agent.Composite{
  agent: AI.Agent.t(),
  error: any(),
  internal: map(),
  messages: AI.Util.msg_list(),
  model: AI.Model.t(),
  request: binary(),
  response: binary() | nil,
  steps: step_queue(),
  toolbox: AI.Tools.toolbox()
}

Callbacks

get_next_steps(step, state)

@callback get_next_steps(step :: step(), state :: t()) :: [step() | [step()]]

Called after on_step_complete/2. Returns a list of steps to prepend to the front of the queue. Return [] to continue with the existing queue.

This is the primary mechanism for dynamic control flow:

  • Retry: return [the_same_step]
  • Task generation: return [task_1, task_2, ..., validate]
  • Conditional branching: inspect state.internal and return different steps

For the reviewer, this always returns [] since the pipeline is fixed. For the coder, the plan step returns task steps, and the validate step can return more task steps on failure.

init(args)

@callback init(args :: map()) :: {:ok, t()} | {:error, any()}

Initialize the composite agent from the caller-provided args map (which includes :agent injected by AI.Agent.get_response/2). Must return a fully populated %AI.Agent.Composite{} with the initial step queue.

on_error(step, error, state)

@callback on_error(step :: step(), error :: any(), state :: t()) ::
  {:retry, t()} | {:skip, t()} | {:halt, t()}

Called when a step fails. state.error contains the error. Return one of:

  • {:retry, state} - re-execute the same step
  • {:skip, state} - clear the error and continue to the next step
  • {:halt, state} - stop execution with the error

on_step_complete(step, state)

@callback on_step_complete(step :: step(), state :: t()) :: t()

Called after a step completes successfully. The step's response is in state.response and has been appended to state.messages. Use this to parse structured output and update state.internal.

Must return the updated state.

on_step_start(step, state)

@callback on_step_start(step :: step(), state :: t()) :: t()

Called immediately before a step executes. Typically used for UI reporting (UI.report_from/2). Must return the (possibly modified) state.

Functions

append_steps(state, new_steps)

@spec append_steps(t(), [step() | [step()]]) :: t()

Append steps to the end of the queue.

completion(name, prompt, opts \\ [])

@spec completion(step_name(), binary(), keyword()) :: completion_step()

Creates a completion step - a turn in this agent's conversation.

Options:

  • :model - override the agent's default model for this step
  • :toolbox - override the agent's default toolbox for this step
  • :response_format - JSON schema to constrain output
  • :keep_prompt? - if true, the prompt remains in message history (default false)

delegate(name, agent_module, args_builder)

@spec delegate(step_name(), module(), (t() -> map())) :: delegate_step()

Creates a delegate step - spawns a sub-agent and injects its response into the parent conversation. The args_builder function receives the current state and must return the args map passed to the sub-agent's get_response/1.

get_state(state, key)

@spec get_state(state :: t(), key :: atom() | list()) ::
  {:ok, any()} | {:error, :not_found}

Retrieves a value from the internal map. key may be a single atom or a list of atoms (nested path per get_in/2 semantics). Returns {:error, :not_found} when any key in the path is missing.

push_steps(state, new_steps)

@spec push_steps(t(), [step() | [step()]]) :: t()

Prepend steps to the front of the queue (next to execute).

put_state(state, key, value)

@spec put_state(state :: t(), key :: atom() | list(), value :: any()) :: t()

Sets a value in the internal map. key may be a single atom or a list of atoms (nested path per put_in/3 semantics). When passing a list, all intermediate keys must already exist.

run(impl, args)

@spec run(module(), map()) :: {:ok, binary()} | {:error, any()}

Runs the composite agent to completion. Calls init/1 on the implementation module, then processes steps from the queue until it's empty or an unrecoverable error occurs.

Returns {:ok, final_response} or {:error, reason}.