Pre-started worker pool with round-robin task dispatch. Unlike Supervisor where workers are spawned per-sub-task and halt after one turn, pool workers stay alive across many turns.
When to reach for this
You have a stream of independent tasks (not a planned-in-advance batch), you want to throttle concurrency to a fixed number of workers, and the startup cost of creating an agent is non-negligible compared to the cost of a turn. Examples: a question-answering bot pool, a scraping pool, a per-role expert panel where you send each incoming question to the "next free" expert.
The defining move is that handle_response/3 returns
{:noreply, state} so the worker stays idle for the next task
instead of halting. Combined with GenAgent.tell/2's natural
mailbox queueing (a busy worker buffers incoming work), you get
backpressure for free.
What it exercises in gen_agent
- Worker lifecycle reuse across many turns:
{:noreply, state}fromhandle_response/3sends the worker back to idle, accumulating results on state. - Per-worker mailbox queueing via
tell/2:GenAgent.tell/2queues when a worker is busy, so the dispatcher can fire-and- forget without checking for availability. - Round-robin dispatch via an atomic counter (
:counters). - Pool-wide quiescence detection: "all workers are idle and
their mailboxes are empty" -- a small loop over
status/1.
The pattern
One worker module (short), one pool dispatcher module (short).
Pool.Worker
defmodule Pool.Worker do
@moduledoc """
A pool worker that stays alive across many turns.
handle_response returns {:noreply, state}, accumulating results
in state. A task submitted to a busy worker waits in its
mailbox (GenAgent.tell/2's natural queueing behavior).
"""
use GenAgent
defmodule State do
defstruct [:name, :role, results: []]
end
@impl true
def init_agent(opts) do
state = %State{
name: Keyword.fetch!(opts, :worker_name),
role: Keyword.get(opts, :role, "research assistant")
}
system = """
You are a #{state.role}. Answer each question concisely in
1-2 sentences. No preamble.
"""
{:ok, [system: system, max_tokens: Keyword.get(opts, :max_tokens, 150)], state}
end
@impl true
def handle_response(_ref, response, %State{} = state) do
entry = %{
text: String.trim(response.text),
usage: response.usage,
duration_ms: response.duration_ms,
completed_at: System.system_time(:millisecond)
}
# NOT :halt -- the worker stays alive for the next task.
{:noreply, %{state | results: state.results ++ [entry]}}
end
endPool dispatcher
defmodule Pool do
alias Pool.Worker
@type handle :: %{workers: [String.t()], counter: :counters.counters_ref()}
def start(size, opts \\ []) when is_integer(size) and size > 0 do
role = Keyword.get(opts, :role, "research assistant")
backend = Keyword.get(opts, :backend, GenAgent.Backends.Anthropic)
id = System.unique_integer([:positive])
workers =
1..size
|> Enum.map(fn i ->
name = "pool-#{id}-#{i}"
{:ok, _pid} = GenAgent.start_agent(Worker,
name: name,
backend: backend,
worker_name: name,
role: role
)
name
end)
counter = :counters.new(1, [:atomics])
{:ok, %{workers: workers, counter: counter}}
end
@doc """
Submit a task. Round-robins across workers and returns the ref
so you can poll if you want.
"""
def submit(%{workers: workers, counter: counter}, task) when is_binary(task) do
idx = :counters.get(counter, 1)
:counters.add(counter, 1, 1)
worker = Enum.at(workers, rem(idx, length(workers)))
# tell/2 naturally queues when the target is busy.
{:ok, ref} = GenAgent.tell(worker, task)
{:ok, {worker, ref}}
end
def submit_many(pool, tasks) when is_list(tasks) do
Enum.map(tasks, fn t ->
{:ok, tuple} = submit(pool, t)
tuple
end)
end
@doc """
Block until every worker is idle with an empty mailbox.
"""
def wait_for_all(pool, timeout \\ 120_000) do
deadline = System.monotonic_time(:millisecond) + timeout
do_wait(pool, deadline)
end
@doc """
Return per-worker results.
"""
def results(%{workers: workers}) do
Enum.map(workers, fn name ->
%{agent_state: %Worker.State{results: r}} = GenAgent.status(name)
%{worker: name, count: length(r), results: r}
end)
end
def stop(%{workers: workers}) do
Enum.each(workers, &GenAgent.stop/1)
end
defp do_wait(%{workers: workers} = pool, deadline) do
any_busy =
Enum.any?(workers, fn w ->
case GenAgent.status(w) do
%{state: :processing} -> true
%{queued: q} when q > 0 -> true
_ -> false
end
end)
cond do
not any_busy -> :ok
System.monotonic_time(:millisecond) >= deadline -> {:error, :timeout}
true ->
Process.sleep(200)
do_wait(pool, deadline)
end
end
endUsing it
{:ok, pool} = Pool.start(3, role: "trivia expert")
Pool.submit_many(pool, [
"capital of France?",
"who wrote Hamlet?",
"first president of the US?",
"speed of light in vacuum?",
"chemical symbol for gold?",
"tallest mountain on Earth?",
"largest ocean?",
"year WWII ended?",
"composer of the Ninth Symphony?"
])
Pool.wait_for_all(pool)
Pool.results(pool)
# => [%{worker: "pool-1-1", count: 3, results: [...]},
# %{worker: "pool-1-2", count: 3, results: [...]},
# %{worker: "pool-1-3", count: 3, results: [...]}]
Pool.stop(pool)Variations
- Work-stealing instead of round-robin. Instead of assigning
the next task to
next_idx, ask each worker's status and pick the one with the smallest queue. More balanced under uneven task durations but adds N status calls per submit. - Typed workers. Not every worker needs the same role. Start
the pool with a map of
%{role => count}and dispatch based on task metadata. - Rate-limited submission. Wrap
submit/2with a token bucket so you can't outpace the backend. Alternative: rely ongen_agent's watchdog and let slow turns time out. - Auto-scaling. Watch pool-wide queue depth via
[:gen_agent, :mailbox, :queued]telemetry; when it grows past a threshold, spawn more workers; when it shrinks, halt the extras.