Pipelines
PhoenixAI.Pipeline executes steps sequentially. Each step receives the previous
step's result. The pipeline halts on the first {:error, reason}.
This is the "railway-oriented programming" pattern: as long as each step returns
{:ok, value}, the value passes to the next step. An error stops the chain.
Ad-hoc Pipeline
For one-off sequences, pass a list of functions to Pipeline.run/3:
alias PhoenixAI.{Message, Response, Pipeline}
steps = [
fn query ->
messages = [%Message{role: :user, content: "Search for: #{query}"}]
AI.chat(messages, provider: :openai)
end,
fn %Response{content: text} ->
messages = [%Message{role: :user, content: "Summarize: #{text}"}]
AI.chat(messages, provider: :openai)
end,
fn %Response{content: summary} ->
String.upcase(summary)
end
]
{:ok, result} = Pipeline.run(steps, "Elixir language features")Each step can return:
{:ok, value}— continue to next step withvalue{:error, reason}— halt the pipelineany_other_value— auto-wrapped as{:ok, any_other_value}and passed forward
DSL Pipeline
For reusable pipelines, define a module with use PhoenixAI.Pipeline:
defmodule MyApp.ResearchPipeline do
use PhoenixAI.Pipeline
step :search do
fn query ->
messages = [%PhoenixAI.Message{role: :user, content: "Research: #{query}"}]
AI.chat(messages, provider: :openai, model: "gpt-4o")
end
end
step :extract_facts do
fn %PhoenixAI.Response{content: text} ->
messages = [
%PhoenixAI.Message{role: :system, content: "Extract key facts as bullet points."},
%PhoenixAI.Message{role: :user, content: text}
]
AI.chat(messages, provider: :openai)
end
end
step :format do
fn %PhoenixAI.Response{content: facts} ->
"## Research Results\n\n#{facts}"
end
end
end
{:ok, result} = MyApp.ResearchPipeline.run("quantum computing breakthroughs 2024")The generated run/1 function is equivalent to:
Pipeline.run(MyApp.ResearchPipeline.steps(), input, opts)You can also inspect the pipeline structure:
MyApp.ResearchPipeline.step_names() # => [:search, :extract_facts, :format]
MyApp.ResearchPipeline.steps() # => [fn, fn, fn]Error Handling in Pipelines
The pipeline halts on the first error:
steps = [
fn input ->
case lookup_data(input) do
{:ok, data} -> {:ok, data}
{:error, :not_found} -> {:error, :search_failed}
end
end,
fn data ->
# This step is skipped if the previous one errored
AI.chat([%Message{role: :user, content: "Analyze: #{data}"}], provider: :openai)
end
]
case Pipeline.run(steps, "query") do
{:ok, result} -> IO.puts(result)
{:error, :search_failed} -> IO.puts("Nothing found")
{:error, reason} -> IO.inspect(reason)
endTeams
PhoenixAI.Team runs multiple agent specs in parallel (fan-out) and merges results
(fan-in). Useful for calling multiple models or running independent research tasks
concurrently.
Ad-hoc Team
alias PhoenixAI.{Message, Team}
specs = [
fn ->
AI.chat(
[%Message{role: :user, content: "Analyze from a technical perspective: Elixir"}],
provider: :openai
)
end,
fn ->
AI.chat(
[%Message{role: :user, content: "Analyze from a business perspective: Elixir"}],
provider: :anthropic
)
end
]
merge_fn = fn results ->
contents = Enum.map(results, fn
{:ok, response} -> response.content
{:error, _} -> "(agent failed)"
end)
Enum.join(contents, "\n\n---\n\n")
end
{:ok, combined} = Team.run(specs, merge_fn, max_concurrency: 2, timeout: 30_000)DSL Team
defmodule MyApp.ResearchTeam do
use PhoenixAI.Team
agent :technical do
fn ->
AI.chat(
[%PhoenixAI.Message{role: :user, content: "Technical analysis of Elixir"}],
provider: :openai,
model: "gpt-4o"
)
end
end
agent :business do
fn ->
AI.chat(
[%PhoenixAI.Message{role: :user, content: "Business case for Elixir"}],
provider: :anthropic
)
end
end
merge do
fn results ->
results
|> Enum.map(fn
{:ok, response} -> response.content
{:error, reason} -> "Error: #{inspect(reason)}"
end)
|> Enum.join("\n\n---\n\n")
end
end
end
{:ok, report} = MyApp.ResearchTeam.run()
{:ok, report} = MyApp.ResearchTeam.run(max_concurrency: 3, timeout: 60_000)Team Options
| Option | Default | Description |
|---|---|---|
:max_concurrency | 5 | Maximum parallel tasks |
:timeout | :infinity | Per-task timeout in milliseconds |
:ordered | true | Preserve input order in results |
Fault Isolation
If an agent spec crashes, the error is isolated and appears as {:error, {:task_failed, reason}}
in the results — the other agents and the merge function are not affected:
merge do
fn results ->
{successes, failures} = Enum.split_with(results, &match?({:ok, _}, &1))
if failures != [] do
IO.warn("#{length(failures)} agent(s) failed")
end
successes
|> Enum.map(fn {:ok, r} -> r.content end)
|> Enum.join("\n\n")
end
endComposition
Pipeline Calling a Team
defmodule MyApp.ResearchPipeline do
use PhoenixAI.Pipeline
step :gather do
fn topic ->
# Run a team in parallel, get combined results
MyApp.ResearchTeam.run()
end
end
step :summarize do
fn combined_text ->
AI.chat(
[%PhoenixAI.Message{role: :user, content: "Final summary:\n#{combined_text}"}],
provider: :openai
)
end
end
endTeam Using a Pipeline
defmodule MyApp.DeepDiveTeam do
use PhoenixAI.Team
agent :english_pipeline do
fn ->
MyApp.TranslatePipeline.run("Hello world")
end
end
agent :french_pipeline do
fn ->
MyApp.TranslatePipeline.run("Bonjour le monde")
end
end
merge do
fn results ->
{:ok, Enum.map(results, fn {:ok, r} -> r end)}
end
end
endAd-hoc Composition
# Pipeline step that fans out to a team
steps = [
fn query -> {:ok, query} end,
fn query ->
Team.run(
[
fn -> AI.chat([%Message{role: :user, content: "Pros of #{query}"}], provider: :openai) end,
fn -> AI.chat([%Message{role: :user, content: "Cons of #{query}"}], provider: :openai) end
],
fn results ->
Enum.map(results, fn {:ok, r} -> r.content end)
end
)
end,
fn [pros, cons] ->
AI.chat(
[%Message{role: :user, content: "Pros: #{pros}\n\nCons: #{cons}\n\nFinal verdict?"}],
provider: :openai
)
end
]
{:ok, verdict} = Pipeline.run(steps, "using microservices")