Normandy.Coordination.AgentPool (normandy v0.2.0)

View Source

Pool manager for agent processes with automatic checkout/checkin.

Provides efficient pooling of identical agent processes with configurable size, overflow handling, and checkout strategies. Built on top of AgentSupervisor for fault tolerance.

Features

  • Fixed pool size with configurable overflow
  • Multiple checkout strategies: :lifo, :fifo
  • Automatic checkin on transaction completion
  • Built-in supervision and fault tolerance
  • Pool statistics and monitoring

Examples

# Start a pool of 10 agents
{:ok, pool} = AgentPool.start_link(
  name: :my_pool,
  agent_config: my_agent_config,
  size: 10,
  overflow: 5,
  strategy: :fifo
)

# Use transaction for automatic checkout/checkin
{:ok, result} = AgentPool.transaction(pool, fn agent_pid ->
  AgentProcess.run(agent_pid, input)
end)

# Manual checkout/checkin
{:ok, agent_pid} = AgentPool.checkout(pool)
result = AgentProcess.run(agent_pid, input)
:ok = AgentPool.checkin(pool, agent_pid)

# Get pool statistics
stats = AgentPool.stats(pool)
#=> %{size: 10, available: 7, in_use: 3, overflow: 0}

Summary

Functions

Returns an agent to the pool.

Checks out an agent from the pool.

Returns a specification to start this module under a supervisor.

Starts an agent pool.

Returns pool statistics.

Stops the agent pool gracefully.

Executes a function with an agent from the pool.

Types

pool_stat()

@type pool_stat() :: %{
  size: non_neg_integer(),
  overflow: non_neg_integer(),
  available: non_neg_integer(),
  in_use: non_neg_integer(),
  max_overflow: non_neg_integer()
}

strategy()

@type strategy() :: :lifo | :fifo

Functions

checkin(pool, agent_pid)

@spec checkin(GenServer.server(), pid()) :: :ok

Returns an agent to the pool.

Examples

{:ok, agent_pid} = AgentPool.checkout(pool)
# ... use agent ...
:ok = AgentPool.checkin(pool, agent_pid)

checkout(pool, opts \\ [])

@spec checkout(
  GenServer.server(),
  keyword()
) :: {:ok, pid()} | {:error, term()}

Checks out an agent from the pool.

Options

  • :timeout - Maximum time to wait in ms (default: 5000)
  • :block - Whether to block if no agents available (default: true)

Examples

{:ok, agent_pid} = AgentPool.checkout(pool)
result = AgentProcess.run(agent_pid, input)
AgentPool.checkin(pool, agent_pid)

# Non-blocking checkout
case AgentPool.checkout(pool, block: false) do
  {:ok, agent_pid} -> # use agent
  {:error, :no_agents} -> # pool empty
end

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts an agent pool.

Options

  • :name - Register the pool with a name (required for named access)
  • :agent_config - BaseAgent configuration map (required)
  • :size - Pool size (default: 10)
  • :max_overflow - Maximum overflow workers (default: 5)
  • :strategy - Checkout strategy :lifo or :fifo (default: :lifo)

Examples

{:ok, pool} = AgentPool.start_link(
  name: :research_pool,
  agent_config: %{
    client: client,
    model: "claude-3-5-sonnet-20241022",
    temperature: 0.7
  },
  size: 10,
  max_overflow: 5,
  strategy: :fifo
)

stats(pool)

@spec stats(GenServer.server()) :: pool_stat()

Returns pool statistics.

Examples

stats = AgentPool.stats(pool)
#=> %{
  size: 10,
  available: 7,
  in_use: 3,
  overflow: 0,
  max_overflow: 5,
  waiting: 2
}

stop(pool)

@spec stop(GenServer.server()) :: :ok

Stops the agent pool gracefully.

Terminates all agent processes and shuts down the pool.

Examples

:ok = AgentPool.stop(pool)

transaction(pool, opts \\ [], fun)

@spec transaction(GenServer.server(), keyword(), (pid() -> term())) ::
  {:ok, term()} | {:error, term()}

Executes a function with an agent from the pool.

Automatically checks out an agent, executes the function, and checks it back in. Handles errors gracefully and ensures the agent is always returned to the pool.

Options

  • :timeout - Checkout timeout in ms (default: 5000)

Examples

{:ok, result} = AgentPool.transaction(pool, fn agent_pid ->
  AgentProcess.run(agent_pid, "Analyze this data")
end)

# With timeout
{:ok, result} = AgentPool.transaction(pool, [timeout: 10_000], fn agent_pid ->
  AgentProcess.run(agent_pid, input)
end)

Returns

  • {:ok, result} - Function result
  • {:error, :timeout} - Checkout timeout
  • {:error, :pool_exhausted} - No agents available and max overflow reached
  • {:error, reason} - Function raised an error