Linear N-stage transformation chain. Each stage is a distinct
single-turn agent with its own role, the output of stage N becomes
the input of stage N+1 via GenAgent.notify/2, and the last stage
halts with the final result on state.
When to reach for this
You have a transformation that decomposes cleanly into a fixed sequence of steps, each step is best modeled as its own LLM with its own system prompt and role, and the output of one step is exactly the input to the next. Brainstorm -> edit -> headline. Research question -> answer -> translate -> summarize. Spec -> design -> implementation notes.
The key difference from Research is that pipeline stages are distinct agents with distinct roles, not phases inside one agent. That matters when each step benefits from a different system prompt, different max tokens, or potentially a different backend.
What it exercises in gen_agent
- One-way cross-agent notify chain: each stage notifies the
next with
{:pipeline_input, text}and then halts. handle_event/2returning{:prompt, text, state}: the receiving stage turns the notify into its dispatch.- Per-stage self-halt after one turn: each stage is
intentionally one-and-done via
{:halt, new_state}fromhandle_response/3. - Nil-terminator for the last stage: the final stage has
next_stage: niland halts without notifying anyone. - Failure propagation:
handle_error/3forwards a{:pipeline_failed, reason}notify down the chain so no downstream stage sits waiting forever.
The pattern
One callback module (used for every stage; the role and instruction are per-stage config), plus a starter that wires up the chain.
Pipeline.Stage
defmodule Pipeline.Stage do
use GenAgent
defmodule State do
defstruct [
:name,
:next_stage,
:role,
:instruction,
:input,
:output,
:error,
index: 0
]
end
@impl true
def init_agent(opts) do
state = %State{
name: Keyword.fetch!(opts, :agent_name),
next_stage: Keyword.get(opts, :next_stage),
role: Keyword.fetch!(opts, :role),
instruction: Keyword.fetch!(opts, :instruction),
index: Keyword.get(opts, :index, 0)
}
system = "You are #{state.role}. #{state.instruction}"
{:ok, [system: system, max_tokens: Keyword.get(opts, :max_tokens, 400)], state}
end
@impl true
def handle_response(_ref, response, %State{} = state) do
output = String.trim(response.text)
new_state = %{state | output: output}
case state.next_stage do
nil ->
# End of pipeline. Final result is on state.output.
{:halt, new_state}
next ->
GenAgent.notify(next, {:pipeline_input, output})
{:halt, new_state}
end
end
@impl true
def handle_error(_ref, reason, %State{} = state) do
new_state = %{state | error: reason}
case state.next_stage do
nil ->
{:halt, new_state}
next ->
GenAgent.notify(next, {:pipeline_failed, reason})
{:halt, new_state}
end
end
@impl true
def handle_event({:pipeline_input, text}, %State{} = state) do
{:prompt, text, %{state | input: text}}
end
def handle_event({:pipeline_failed, reason}, %State{} = state) do
new_state = %{state | error: {:upstream_failed, reason}}
case state.next_stage do
nil -> {:halt, new_state}
next ->
GenAgent.notify(next, {:pipeline_failed, reason})
{:halt, new_state}
end
end
def handle_event(_other, state), do: {:noreply, state}
endStarter
defmodule Pipeline do
alias Pipeline.Stage
def run(initial_input, stages_config, opts \\ []) do
backend = Keyword.get(opts, :backend, GenAgent.Backends.Anthropic)
id = System.unique_integer([:positive])
# Assign unique names per stage and compute the next-stage pointer.
stage_names =
stages_config
|> Enum.with_index(1)
|> Enum.map(fn {cfg, i} -> "pipe-#{id}-#{i}-#{cfg.name}" end)
# next_map: stage_name -> name_of_next_stage_or_nil
next_map =
stage_names
|> Enum.zip(Enum.drop(stage_names, 1) ++ [nil])
|> Map.new()
stages_config
|> Enum.with_index(1)
|> Enum.each(fn {cfg, i} ->
name = Enum.at(stage_names, i - 1)
{:ok, _pid} = GenAgent.start_agent(Stage,
name: name,
agent_name: name,
backend: backend,
next_stage: Map.fetch!(next_map, name),
role: cfg.role,
instruction: cfg.instruction,
index: i
)
end)
# Kick off the first stage with the initial input.
[first | _] = stage_names
{:ok, _ref} = GenAgent.tell(first, initial_input)
{:ok, %{stages: stage_names}}
end
endUsing it
{:ok, handle} = Pipeline.run(
"the octopus has three hearts and blue blood",
[
%{
name: "brainstorm",
role: "a creative brainstormer",
instruction: "Given a fact, list 3 distinct angles to write about it. One per line."
},
%{
name: "editor",
role: "a sharp editor",
instruction: "Given a list of angles, pick the most interesting and develop it into a tight paragraph."
},
%{
name: "headline",
role: "a headline writer",
instruction: "Given a paragraph, write ONE compelling title. Output only the title."
}
]
)
# Each stage notifies the next as it completes. Wait for the
# last stage to halt:
last = List.last(handle.stages)
# In practice, a small wait loop checking:
%{agent_state: %{output: output}} = GenAgent.status(last)
IO.puts(output)
# Read the trace across all stages:
Enum.map(handle.stages, fn name ->
%{agent_state: %{input: i, output: o, role: r}} = GenAgent.status(name)
%{stage: name, role: r, in: i, out: o}
end)
# Clean up:
Enum.each(handle.stages, &GenAgent.stop/1)Variations
- Per-stage backend selection. Nothing in the pattern requires
every stage to use the same backend. Pass a
:backendin each stage config and let cheap stages (brainstorm) use a faster model than expensive stages (synthesis). - Branching pipelines. Instead of a linear chain, have one stage notify multiple "next" stages with the same output, then a later join stage collects them. You're now halfway to the Supervisor shape.
- Reusable stages. The same callback module can be instantiated many times in the same pipeline with different roles -- e.g. two "editor" stages in sequence with different instructions.
- Conditional routing.
handle_event({:pipeline_input, text}, state)can inspecttextbefore deciding what to do -- dispatch, transform, or{:halt, state}early if the upstream produced something bad.