Normandy.Agents.BaseAgent (normandy v0.2.0)

View Source

Core agent implementation providing conversational AI capabilities.

BaseAgent manages agent state, memory, and LLM interactions through a stateful configuration approach.

Summary

Functions

Gets a tool from the agent's tool registry by name.

Checks if the agent has any tools registered.

Lists all tools available to the agent.

Process multiple inputs through the agent concurrently.

Process a batch and return detailed statistics.

Registers a tool in the agent's tool registry.

Run the agent with optional streaming support.

Run the agent with tool calling support.

Stream a response from the LLM with real-time callbacks.

Stream responses with tool calling support.

Types

config_input()

@type config_input() :: %{
  :client => struct(),
  :model => String.t(),
  :temperature => float(),
  optional(:input_schema) => struct(),
  optional(:output_schema) => struct(),
  optional(:memory) => map(),
  optional(:prompt_specification) =>
    Normandy.Components.PromptSpecification.t(),
  optional(:max_tokens) => pos_integer() | nil,
  optional(:tool_registry) => Normandy.Tools.Registry.t(),
  optional(:max_tool_iterations) => pos_integer(),
  optional(:retry_options) => keyword(),
  optional(:enable_circuit_breaker) => boolean(),
  optional(:circuit_breaker_options) => keyword(),
  optional(:enable_json_retry) => boolean(),
  optional(:json_retry_max_attempts) => pos_integer()
}

Functions

delete_context_provider(config, provider_name)

get_context_provider(base_agent_config, provider_name)

@spec get_context_provider(Normandy.Agents.BaseAgentConfig.t(), atom()) :: struct()

get_response(config, response_model \\ nil)

@spec get_response(Normandy.Agents.BaseAgentConfig.t(), struct() | nil) :: struct()

get_tool(base_agent_config, tool_name)

@spec get_tool(Normandy.Agents.BaseAgentConfig.t(), String.t()) ::
  {:ok, struct()} | :error

Gets a tool from the agent's tool registry by name.

Examples

iex> agent = BaseAgent.register_tool(agent, %Calculator{})
iex> BaseAgent.get_tool(agent, "calculator")
{:ok, %Calculator{}}

has_tools?(base_agent_config)

@spec has_tools?(Normandy.Agents.BaseAgentConfig.t()) :: boolean()

Checks if the agent has any tools registered.

Examples

iex> BaseAgent.has_tools?(agent)
true

init(config)

list_tools(base_agent_config)

@spec list_tools(Normandy.Agents.BaseAgentConfig.t()) :: [struct()]

Lists all tools available to the agent.

Examples

iex> BaseAgent.list_tools(agent)
[%Calculator{}, %StringManipulator{}]

process_batch(agent, inputs, opts \\ [])

@spec process_batch(Normandy.Agents.BaseAgentConfig.t(), [term()], keyword()) ::
  {:ok, [term()] | map()}

Process multiple inputs through the agent concurrently.

Provides efficient batch processing with configurable concurrency. Delegates to Normandy.Batch.Processor.process_batch/3.

Options

  • :max_concurrency - Maximum concurrent tasks (default: 10)
  • :ordered - Preserve input order in results (default: true)
  • :timeout - Timeout per task in milliseconds (default: 300_000ms)
  • :on_progress - Callback function called after each completion
  • :on_error - Callback function called on each error

Examples

# Simple batch
inputs = [
  %{chat_message: "Hello"},
  %{chat_message: "How are you?"}
]
{:ok, results} = BaseAgent.process_batch(agent, inputs)

# With options
{:ok, results} = BaseAgent.process_batch(
  agent,
  inputs,
  max_concurrency: 5
)

process_batch_with_stats(agent, inputs, opts \\ [])

@spec process_batch_with_stats(
  Normandy.Agents.BaseAgentConfig.t(),
  [term()],
  keyword()
) :: {:ok, map()}

Process a batch and return detailed statistics.

Returns success/error breakdown with counts.

Examples

{:ok, stats} = BaseAgent.process_batch_with_stats(agent, inputs)
#=> %{
  success: [result1, result2],
  errors: [{input3, error}],
  total: 3,
  success_count: 2,
  error_count: 1
}

register_context_provider(config, provider_name, provider)

@spec register_context_provider(Normandy.Agents.BaseAgentConfig.t(), atom(), struct()) ::
  Normandy.Agents.BaseAgentConfig.t()

register_tool(config, tool)

Registers a tool in the agent's tool registry.

Creates a new registry if one doesn't exist.

Examples

iex> agent = BaseAgent.init(config)
iex> tool = %Normandy.Tools.Examples.Calculator{}
iex> agent = BaseAgent.register_tool(agent, tool)

reset_memory(config)

run(config, user_input \\ nil)

run(config, user_input, opts)

Run the agent with optional streaming support.

Accepts a keyword list as the third argument with options:

  • :stream - Boolean, enables streaming mode
  • :on_chunk - Callback function for streaming chunks

Example

BaseAgent.run(agent, %{chat_message: "Hello"}, stream: true, on_chunk: fn chunk ->
  IO.write(chunk)
end)

run_with_tools(config, user_input \\ nil)

Run the agent with tool calling support.

This method handles the full tool execution loop:

  1. Send user input to LLM
  2. If LLM requests tool calls, execute them
  3. Send tool results back to LLM
  4. Repeat until LLM provides final response or max iterations reached

Parameters

  • config: Agent configuration
  • user_input: Optional user input to start the conversation

Returns

Tuple of {updated_config, final_response}

Examples

iex> {config, response} = BaseAgent.run_with_tools(agent, user_input)

stream_response(config, user_input \\ nil, callback)

Stream a response from the LLM with real-time callbacks.

This method enables streaming mode, allowing you to process LLM responses as they're generated. Callbacks are invoked for each event type.

Parameters

  • config: Agent configuration
  • user_input: Optional user input (can be nil to continue conversation)
  • callback: Function (event_type, data) -> :ok called for each event

Event Types

  • :text_delta - Incremental text content
  • :tool_use_start - Tool call beginning
  • :thinking_delta - Extended thinking content
  • :message_start - Stream beginning
  • :message_stop - Stream complete

Returns

{config, final_response} - Updated config and accumulated response

Example

callback = fn
  :text_delta, text -> IO.write(text)
  :tool_use_start, tool -> IO.puts("\nCalling tool: #{tool["name"]}")
  _, _ -> :ok
end

{agent, response} = BaseAgent.stream_response(agent, input, callback)

stream_with_tools(config, user_input \\ nil, callback)

Stream responses with tool calling support.

Combines streaming and tool execution - as the LLM streams its response, tool calls are detected and executed, with results fed back into the stream.

Parameters

  • config: Agent configuration
  • user_input: Optional user input to start the conversation
  • callback: Function (event_type, data) -> :ok called for each event

Event Types

  • :text_delta - Incremental text content
  • :tool_use_start - Tool call beginning
  • :tool_result - Tool execution result (custom event)
  • :thinking_delta - Extended thinking content
  • :message_start - Stream beginning
  • :message_stop - Stream complete

Returns

{config, final_response} - Updated config and accumulated response

Example

callback = fn
  :text_delta, text -> IO.write(text)
  :tool_use_start, tool -> IO.puts("\nCalling tool: #{tool["name"]}")
  :tool_result, result -> IO.puts("Tool result: #{inspect(result)}")
  _, _ -> :ok
end

{agent, response} = BaseAgent.stream_with_tools(agent, input, callback)