Coordinator + dynamic worker pool. One coordinator plans sub-tasks
via its own LLM turn, spawns N worker agents from inside its own
handle_response/3 callback, notifies each worker with its
sub-task, collects results as they arrive via notify, and
self-chains a synthesis turn.
When to reach for this
A task decomposes into N independent sub-tasks where N is decided by the LLM (or by runtime conditions), each sub-task gets its own agent, and the coordinator has to aggregate their outputs into a final answer. You want fan-out for parallelism and fan-in for the synthesis step. Classic map/reduce, but every worker is an agent.
This is the richest cross-agent pattern in this collection. Everything else composes: the coordinator is a Research-style self-chaining agent whose planning turn spawns a pool of one-shot workers, each of whom is essentially a single-item Pipeline stage.
What it exercises in gen_agent
- Dynamic
GenAgent.start_agent/2called from inside a running callback. The coordinator's planning-phasehandle_responsespawns workers on the fly. They join the sharedGenAgentsupervision tree and are live from the moment they start. - Fan-out via notify: the coordinator notifies each worker with its sub-task immediately after spawning. Workers sit idle until they receive the notify.
- Fan-in via notify: each worker notifies the coordinator
with its result (or failure). The coordinator's
handle_event/2accumulates results into a map. - Multi-phase coordinator state machine with an LLM turn at each end (planning -> dispatch -> collect -> synthesize).
- Self-halt workers: each worker halts via
{:halt, state}after its single turn so the coordinator doesn't have to track or stop them explicitly.
The pattern
Two callback modules: a Coordinator that owns the phase state
machine and spawns workers, and a Worker that is one-shot and
notifies the coordinator with its result.
Supervisor.Coordinator
defmodule Supervisor.Coordinator do
use GenAgent
alias Supervisor.Worker
defmodule State do
defstruct [
:topic,
:max_workers,
:coordinator_name,
:final_output,
:error,
phase: :planning,
sub_tasks: [],
workers: [],
results: %{},
failures: %{}
]
end
@impl true
def init_agent(opts) do
state = %State{
topic: Keyword.fetch!(opts, :topic),
max_workers: Keyword.get(opts, :max_workers, 3),
coordinator_name: Keyword.fetch!(opts, :coordinator_name)
}
system = """
You are a research coordinator.
When asked to plan sub-tasks, output them one per line, no
numbering or bullets -- just plain sub-task text, one per line.
When asked to synthesize worker results, write a coherent
2-3 paragraph answer that weaves together the findings.
"""
{:ok, [system: system, max_tokens: Keyword.get(opts, :max_tokens, 600)], state}
end
# Phase :planning -> spawn workers, notify each, transition to :collecting.
@impl true
def handle_response(_ref, response, %State{phase: :planning} = state) do
sub_tasks =
response.text
|> String.split("\n")
|> Enum.map(&String.trim/1)
|> Enum.reject(&(&1 == ""))
|> Enum.take(state.max_workers)
workers = spawn_workers(state.coordinator_name, sub_tasks)
Enum.zip(workers, sub_tasks)
|> Enum.each(fn {worker, task} ->
GenAgent.notify(worker, {:sub_task, task})
end)
new_state = %{state | sub_tasks: sub_tasks, workers: workers, phase: :collecting}
case sub_tasks do
[] -> {:halt, %{new_state | phase: :failed, error: :no_sub_tasks}}
_ -> {:noreply, new_state}
end
end
# Phase :synthesizing -> terminal halt with the final answer.
def handle_response(_ref, response, %State{phase: :synthesizing} = state) do
{:halt, %{state | final_output: String.trim(response.text), phase: :done}}
end
# Phase :collecting -> accumulate worker results, self-chain synthesis
# once everyone has reported.
@impl true
def handle_event({:worker_result, worker_name, text}, %State{phase: :collecting} = state) do
maybe_synthesize(%{state | results: Map.put(state.results, worker_name, text)})
end
def handle_event({:worker_failed, worker_name, reason}, %State{phase: :collecting} = state) do
maybe_synthesize(%{state | failures: Map.put(state.failures, worker_name, reason)})
end
def handle_event(_other, state), do: {:noreply, state}
@impl true
def handle_error(_ref, reason, %State{} = state) do
{:halt, %{state | error: reason, phase: :failed}}
end
defp maybe_synthesize(%State{} = state) do
received = map_size(state.results) + map_size(state.failures)
cond do
received < length(state.workers) ->
{:noreply, state}
state.results == %{} ->
{:halt, %{state | phase: :failed, error: :all_workers_failed}}
true ->
{:prompt, synthesis_prompt(state), %{state | phase: :synthesizing}}
end
end
defp synthesis_prompt(%State{} = state) do
sections =
state.sub_tasks
|> Enum.with_index()
|> Enum.map_join("\n\n", fn {task, i} ->
worker = Enum.at(state.workers, i)
result = Map.get(state.results, worker, "(worker failed)")
"Sub-task: #{task}\nResult: #{result}"
end)
"""
Your workers have reported on all sub-tasks for the topic:
#{state.topic}
Here is what each worker returned:
#{sections}
Synthesize these into a cohesive 2-paragraph answer.
"""
end
defp spawn_workers(coordinator_name, sub_tasks) do
sub_tasks
|> Enum.with_index(1)
|> Enum.map(fn {_task, i} ->
worker_name = "#{coordinator_name}-worker-#{i}"
{:ok, _pid} = GenAgent.start_agent(Worker,
name: worker_name,
backend: GenAgent.Backends.Anthropic,
worker_name: worker_name,
supervisor: coordinator_name
)
worker_name
end)
end
endSupervisor.Worker
defmodule Supervisor.Worker do
use GenAgent
defmodule State do
defstruct [:name, :supervisor, :task, :result, :error]
end
@impl true
def init_agent(opts) do
state = %State{
name: Keyword.fetch!(opts, :worker_name),
supervisor: Keyword.fetch!(opts, :supervisor)
}
system = """
You are a research worker. You will be given exactly one
sub-task. Answer it in 2-3 concise sentences. No preamble.
"""
{:ok, [system: system, max_tokens: 300], state}
end
@impl true
def handle_event({:sub_task, task}, %State{} = state) do
{:prompt, task, %{state | task: task}}
end
def handle_event(_other, state), do: {:noreply, state}
@impl true
def handle_response(_ref, response, %State{} = state) do
result = String.trim(response.text)
GenAgent.notify(state.supervisor, {:worker_result, state.name, result})
{:halt, %{state | result: result}}
end
@impl true
def handle_error(_ref, reason, %State{} = state) do
GenAgent.notify(state.supervisor, {:worker_failed, state.name, reason})
{:halt, %{state | error: reason}}
end
endUsing it
name = "coord-#{System.unique_integer([:positive])}"
{:ok, _pid} = GenAgent.start_agent(Supervisor.Coordinator,
name: name,
backend: GenAgent.Backends.Anthropic,
topic: "why do octopuses have three hearts?",
max_workers: 3,
coordinator_name: name
)
# Kick off the planning turn.
{:ok, _ref} = GenAgent.tell(name,
"Break the topic into 3 specific sub-questions. One per line.")
# The coordinator will plan, spawn 3 workers, dispatch their
# sub-tasks, wait for responses, synthesize, and halt. The manager
# just watches.
# When phase: :done, read the final output:
%{agent_state: %{final_output: output}} = GenAgent.status(name)
IO.puts(output)
GenAgent.stop(name)Variations
- Bounded concurrency. For very large N, instead of spawning N workers, spawn K and use a work-stealing loop: when one worker halts, the coordinator notifies a new worker with the next sub-task. See Pool for a cleaner version of this shape.
- Heterogeneous workers. Different sub-tasks can get
different worker modules. The coordinator's
spawn_workersfunction decides which module to instantiate based on the sub-task content. - Partial success. The current
maybe_synthesizeonly proceeds if at least one worker succeeded. You could instead require a quorum (e.g. 2/3) or fail the whole run if any worker failed. - Nested coordinators. Any worker could itself be a coordinator that fans out further. The shared supervision tree doesn't care -- each level just spawns agents into it.
- Streaming synthesis. Instead of waiting for all workers before synthesizing, the coordinator could start synthesis once the first K results are in, incorporate later results by editing state, and produce a final synthesis when everything is complete. Requires a more complex phase machine.