Pipelines
View SourceThe Pipeline system provides composable RAG workflows with parallel execution, caching, and error handling.
Overview
Pipelines consist of:
- Steps - Individual processing units
- Context - Shared state between steps
- Executor - Runs steps with caching/retry/telemetry
Creating a Pipeline
alias Rag.Pipeline
alias Rag.Pipeline.Step
pipeline = Pipeline.new(:rag_pipeline, description: "Complete RAG workflow")Adding Steps
pipeline = Pipeline.add_step(pipeline,
name: :embed_query,
module: Steps,
function: :embed_query,
args: [model: "gemini"],
timeout: 10_000,
on_error: {:retry, 2},
cache: true
)Step Options
| Option | Default | Description |
|---|---|---|
name | required | Step identifier (atom) |
module | required | Module containing function |
function | required | Function to call |
args | [] | Arguments passed to function |
inputs | nil | Dependencies on previous steps |
parallel | false | Run concurrently |
on_error | :halt | Error handling strategy |
cache | false | Cache results with ETS |
timeout | nil | Timeout in milliseconds |
Step Functions
Every step function must have this signature:
def step_name(input, context, opts) do
# input: Result from previous step(s)
# context: Pipeline.Context struct
# opts: Keyword list from step's :args
# Return one of:
{:ok, result}
{:ok, result, updated_context}
{:error, reason}
endExample Step
defmodule MySteps do
alias Rag.Pipeline.Context
def embed_query(query, context, opts) do
router = opts[:router]
case Router.execute(router, :embeddings, [query], []) do
{:ok, [embedding], _} ->
updated_context = %{context | query_embedding: embedding}
{:ok, embedding, updated_context}
{:error, reason} ->
{:error, reason}
end
end
endContext
The context holds state throughout pipeline execution:
alias Rag.Pipeline.Context
# Create context
context = Context.new("What is Elixir?")
# Context structure
%Context{
input: any(), # Original input
query: String.t() | nil,
query_embedding: [float()] | nil,
retrieval_results: list(),
reranked_results: list(),
context_text: String.t() | nil,
response: String.t() | nil,
metadata: %{step_results: %{}},
errors: []
}Context API
# Store step result
context = Context.put_step_result(context, :embed, embedding)
# Get step result
embedding = Context.get_step_result(context, :embed)
# Store metadata
context = Context.put_metadata(context, :user_id, 123)
# Add error (for :continue strategy)
context = Context.add_error(context, {:step_failed, :rerank})Input Dependencies
No Dependencies (Default)
Step receives output from previous step:
Pipeline.add_step(pipeline, name: :step2, ...)
# step2 receives output from step1Single Dependency
Pipeline.add_step(pipeline,
name: :generate,
inputs: [:retrieve],
...
)
# generate receives the :retrieve step's resultMultiple Dependencies
Pipeline.add_step(pipeline,
name: :combine,
inputs: [:semantic_search, :fulltext_search],
...
)
# combine receives: %{semantic_search: [...], fulltext_search: [...]}Error Handling
:halt (Default)
Stop pipeline immediately on error:
Pipeline.add_step(pipeline,
name: :critical,
on_error: :halt # Pipeline stops if this fails
):continue
Log error but continue execution:
Pipeline.add_step(pipeline,
name: :optional_rerank,
on_error: :continue # Skip if fails, continue pipeline
){:retry, n}
Retry up to n times:
Pipeline.add_step(pipeline,
name: :api_call,
on_error: {:retry, 3} # Retry up to 3 times
)Parallel Execution
Independent steps can run concurrently:
Pipeline.new(:hybrid_search)
|> Pipeline.add_step(name: :embed, ...)
|> Pipeline.add_step(
name: :semantic_search,
inputs: [:embed],
parallel: true # Runs in parallel
)
|> Pipeline.add_step(
name: :fulltext_search,
inputs: [:embed],
parallel: true # Runs in parallel
)
|> Pipeline.add_step(
name: :combine,
inputs: [:semantic_search, :fulltext_search] # Waits for both
)Caching
Cache expensive operations with ETS:
Pipeline.add_step(pipeline,
name: :embed,
cache: true # Results cached in ETS
)Benefits:
- Same input skips execution, uses cache
- Persists across pipeline runs
- Useful for embeddings, expensive computations
Performance:
First run: 3500ms (embedding computed)
Second run: 2100ms (embedding cached)
Speedup: ~40%Timeouts
Prevent hanging on slow steps:
Pipeline.add_step(pipeline,
name: :llm_generate,
timeout: 30_000 # 30 second timeout
)Telemetry
Pipeline emits telemetry events:
# Start event
[:rag, :pipeline, :step, :start]
# Metadata: %{pipeline: :name, step: :step_name, attempt: 0}
# Stop event
[:rag, :pipeline, :step, :stop]
# Measurements: %{duration: microseconds}
# Exception event
[:rag, :pipeline, :step, :exception]
# Metadata: %{pipeline: :name, step: :step_name, error: reason}Attaching Handlers
:telemetry.attach(
"pipeline-logger",
[:rag, :pipeline, :step, :stop],
fn _event, measurements, metadata, _config ->
IO.puts("Step #{metadata.step} completed in #{measurements.duration}μs")
end,
nil
)Complete Example
defmodule MyApp.RAGPipeline do
alias Rag.Pipeline
alias Rag.Pipeline.Context
def build(router, retriever) do
Pipeline.new(:rag_pipeline, description: "Complete RAG")
|> Pipeline.add_step(
name: :embed_query,
module: __MODULE__,
function: :embed_query,
args: [router: router],
timeout: 10_000,
on_error: {:retry, 2},
cache: true
)
|> Pipeline.add_step(
name: :retrieve,
module: __MODULE__,
function: :retrieve,
args: [retriever: retriever],
inputs: [:embed_query],
timeout: 5_000
)
|> Pipeline.add_step(
name: :rerank,
module: __MODULE__,
function: :rerank,
args: [router: router],
inputs: [:retrieve],
on_error: :continue # Optional step
)
|> Pipeline.add_step(
name: :generate,
module: __MODULE__,
function: :generate,
args: [router: router],
inputs: [:rerank],
timeout: 30_000
)
end
def embed_query(query, context, opts) do
router = opts[:router]
case Router.execute(router, :embeddings, [query], []) do
{:ok, [embedding], _} ->
{:ok, embedding, %{context | query: query, query_embedding: embedding}}
{:error, reason} ->
{:error, reason}
end
end
def retrieve(embedding, context, opts) do
retriever = opts[:retriever]
case Retriever.retrieve(retriever, {embedding, context.query}, limit: 10) do
{:ok, results} ->
{:ok, results, %{context | retrieval_results: results}}
{:error, reason} ->
{:error, reason}
end
end
def rerank(results, context, opts) do
router = opts[:router]
reranker = Rag.Reranker.LLM.new(router: router)
case Rag.Reranker.rerank(reranker, context.query, results, top_k: 5) do
{:ok, reranked} ->
{:ok, reranked, %{context | reranked_results: reranked}}
{:error, _} ->
# Return original results if reranking fails
{:ok, results}
end
end
def generate(results, context, opts) do
router = opts[:router]
context_text = Enum.map(results, & &1.content) |> Enum.join("\n\n")
prompt = """
Answer based on context:
#{context_text}
Question: #{context.query}
"""
case Router.execute(router, :text, prompt, []) do
{:ok, response, _} ->
{:ok, response, %{context | response: response, context_text: context_text}}
{:error, reason} ->
{:error, reason}
end
end
end
# Usage
{:ok, router} = Router.new(providers: [:gemini])
retriever = %Rag.Retriever.Hybrid{repo: Repo}
pipeline = MyApp.RAGPipeline.build(router, retriever)
context = Context.new("What is GenServer?")
case Pipeline.execute(pipeline, context.input) do
{:ok, response, final_context} ->
IO.puts("Answer: #{response}")
IO.puts("Used #{length(final_context.retrieval_results)} documents")
{:error, reason} ->
IO.puts("Pipeline failed: #{inspect(reason)}")
endConfiguration Best Practices
Timeouts
| Step Type | Suggested Timeout |
|---|---|
| Embedding | 10-15 seconds |
| Database query | 5 seconds |
| LLM generation | 30-60 seconds |
| Overall pipeline | Sum + buffer |
Caching
Enable for:
- Embeddings (deterministic)
- Expensive computations
- Repeated queries
Disable for:
- User-specific results
- Time-sensitive data
Error Handling
| Step Type | Strategy |
|---|---|
| Critical (embedding, retrieval) | :halt or {:retry, 2} |
| Enhancement (reranking) | :continue |
| External APIs | {:retry, 3} |
Next Steps
- Retrievers - Retrieval strategies for pipelines
- Rerankers - Improve retrieval quality
- Agent Framework - Integrate agents in pipelines