Pipeline Composition Reference
View SourceTable of Contents
- Overview
- Pipeline Step Type
- Input Mapping
- Output Extraction
- Context Management
- Safety Features
- Execution Configuration
- Design Patterns
- Best Practices
- Troubleshooting
Overview
Pipeline composition enables building complex workflows from smaller, reusable pipeline components. This modular approach provides:
- Reusability: Share common workflows across projects
- Maintainability: Update shared logic in one place
- Testability: Test pipeline components in isolation
- Scalability: Build arbitrarily complex workflows
- Organization: Separate concerns into logical units
Pipeline Step Type
The pipeline
step type executes another complete pipeline as a single step:
- name: "data_processing"
type: "pipeline"
# Pipeline source (one required)
pipeline_file: "./pipelines/data_processor.yaml" # External file
pipeline_ref: "common/data_processor" # Registry (future)
pipeline: # Inline definition
name: "inline_processor"
steps:
- name: "process"
type: "gemini"
prompt:
- type: "static"
content: "Process data"
External Pipeline Files
Most common approach - reference external YAML files:
steps:
- name: "security_scan"
type: "pipeline"
pipeline_file: "./pipelines/security/vulnerability_scanner.yaml"
inputs:
target_directory: "./src"
scan_depth: "comprehensive"
outputs:
- "vulnerabilities"
- "security_score"
File Organization:
pipelines/
├── components/ # Reusable components
│ ├── data/
│ │ ├── validator.yaml
│ │ ├── transformer.yaml
│ │ └── aggregator.yaml
│ ├── analysis/
│ │ ├── code_review.yaml
│ │ ├── security_scan.yaml
│ │ └── performance_check.yaml
│ └── generation/
│ ├── test_generator.yaml
│ └── doc_generator.yaml
├── workflows/ # Complete workflows
│ ├── full_analysis.yaml
│ └── deployment.yaml
└── templates/ # Pipeline templates
└── standard_review.yaml
Inline Pipeline Definition
Define pipelines directly within the parent:
- name: "quick_validation"
type: "pipeline"
pipeline:
name: "inline_validator"
steps:
- name: "syntax_check"
type: "gemini"
prompt:
- type: "static"
content: "Check syntax validity"
- type: "static"
content: "{{inputs.code}}"
- name: "semantic_check"
type: "gemini"
prompt:
- type: "static"
content: "Verify semantic correctness"
- type: "previous_response"
step: "syntax_check"
Pipeline Registry (Future)
Reference pipelines from a central registry:
- name: "standard_security_scan"
type: "pipeline"
pipeline_ref: "security/owasp_top_10_scan"
version: "2.1.0"
inputs:
target: "{{workspace.path}}"
Input Mapping
Pass data from parent to child pipeline:
- name: "analyze_code"
type: "pipeline"
pipeline_file: "./pipelines/code_analyzer.yaml"
inputs:
# Direct values
language: "python"
framework: "django"
# From previous steps
source_code: "{{steps.extract.code}}"
requirements: "{{steps.parse.requirements}}"
# From workflow context
project_name: "{{workflow.project_name}}"
environment: "{{workflow.environment}}"
# From state variables
analysis_config: "{{state.config}}"
# Complex expressions
threshold: "{{state.base_threshold * 1.5}}"
# Arrays and objects
files_to_analyze: "{{steps.scan.python_files}}"
options:
depth: "comprehensive"
include_tests: true
metrics: ["complexity", "coverage", "quality"]
Type Preservation
Single template references preserve their original type:
inputs:
# Preserves integer type
max_iterations: "{{config.iterations}}" # → 10
# Preserves object type
settings: "{{steps.load.config}}" # → {"timeout": 30, "retry": 3}
# Preserves array type
items: "{{steps.gather.results}}" # → ["a", "b", "c"]
# String concatenation forces string type
message: "Count: {{config.iterations}}" # → "Count: 10"
Input Validation
Child pipelines can validate inputs:
# child_pipeline.yaml
workflow:
name: "validated_processor"
# Input schema (future)
input_schema:
type: "object"
required: ["data", "format"]
properties:
data:
type: "array"
minItems: 1
format:
type: "string"
enum: ["json", "csv", "xml"]
steps:
- name: "process"
type: "gemini"
prompt:
- type: "static"
content: "Process {{inputs.data}} in {{inputs.format}} format"
Output Extraction
Extract specific results from nested pipelines:
- name: "run_analysis"
type: "pipeline"
pipeline_file: "./analyzer.yaml"
outputs:
# Simple extraction - gets steps.{name}.result
- "final_report"
# Path-based extraction
- path: "metrics.security.score"
as: "security_score"
- path: "analysis.vulnerabilities"
as: "vulnerabilities_list"
- path: "summary.recommendations[0]"
as: "top_recommendation"
Output Patterns
Simple Extraction:
outputs:
- "step_name" # Extracts steps.step_name.result
Path Extraction:
outputs:
- path: "data.items[0].value"
as: "first_value"
- path: "metrics.performance.response_time"
as: "response_time"
Multiple Extractions:
outputs:
- path: "analysis.score"
as: "quality_score"
- path: "analysis.issues"
as: "found_issues"
- path: "recommendations"
as: "improvement_suggestions"
Using Extracted Outputs
steps:
- name: "security_pipeline"
type: "pipeline"
pipeline_file: "./security_scan.yaml"
outputs:
- path: "scan.vulnerabilities"
as: "vulns"
- path: "scan.risk_score"
as: "risk"
- name: "process_results"
type: "gemini"
prompt:
- type: "static"
content: |
Risk Score: {{steps.security_pipeline.risk}}
Vulnerabilities Found: {{length(steps.security_pipeline.vulns)}}
Details:
- type: "static"
content: "{{steps.security_pipeline.vulns}}"
Context Management
Context Inheritance
Control what context is passed to child pipelines:
- name: "child_pipeline"
type: "pipeline"
pipeline_file: "./child.yaml"
config:
inherit_context: true # Pass all parent context
inherit_providers: true # Inherit API configurations
inherit_functions: false # Don't inherit function definitions
Inheritance Options:
inherit_context
: Share variables and stateinherit_providers
: Share API keys and settingsinherit_functions
: Share Gemini function definitions
Context Isolation
Each pipeline maintains its own context:
# Parent pipeline
workflow:
name: "parent"
variables:
shared_var: "parent_value"
private_var: "parent_only"
steps:
- name: "child_isolated"
type: "pipeline"
pipeline_file: "./child.yaml"
config:
inherit_context: false # Complete isolation
inputs:
# Explicitly pass needed values
needed_var: "{{variables.shared_var}}"
Variable Resolution
Variable resolution follows a hierarchical order:
- Child pipeline's local variables
- Child pipeline's inputs
- Inherited parent context (if enabled)
- Parent's parent context (recursive)
# Variable resolution example
- name: "nested_pipeline"
type: "pipeline"
pipeline_file: "./nested.yaml"
config:
inherit_context: true
inputs:
# These override any inherited values
override_var: "child_specific_value"
Safety Features
Recursion Protection
Prevent infinite recursion and circular dependencies:
- name: "recursive_pipeline"
type: "pipeline"
pipeline_file: "./processor.yaml"
config:
max_depth: 5 # Maximum nesting depth
Circular Dependency Detection:
# pipeline_a.yaml
- name: "call_b"
type: "pipeline"
pipeline_file: "./pipeline_b.yaml"
# pipeline_b.yaml
- name: "call_a"
type: "pipeline"
pipeline_file: "./pipeline_a.yaml" # ERROR: Circular dependency!
Resource Limits
Control resource usage:
- name: "resource_limited"
type: "pipeline"
pipeline_file: "./heavy_processor.yaml"
config:
memory_limit_mb: 1024 # 1GB memory limit
timeout_seconds: 300 # 5 minute timeout
max_total_steps: 100 # Step count limit
Error Boundaries
Isolate errors within child pipelines:
- name: "error_isolated"
type: "pipeline"
pipeline_file: "./risky_operation.yaml"
config:
continue_on_error: true # Don't fail parent
capture_errors: true # Store error details
# Handle errors in parent
error_handler:
- name: "handle_child_error"
type: "gemini"
condition: "steps.error_isolated.error != null"
prompt:
- type: "static"
content: "Child pipeline failed: {{steps.error_isolated.error}}"
Execution Configuration
Workspace Management
Each pipeline can have its own workspace:
- name: "isolated_workspace"
type: "pipeline"
pipeline_file: "./file_processor.yaml"
config:
workspace_dir: "./nested/{{step.name}}"
cleanup_on_success: true # Remove after completion
cleanup_on_error: false # Keep for debugging
Checkpointing
Nested checkpoint configuration:
- name: "checkpointed_pipeline"
type: "pipeline"
pipeline_file: "./long_running.yaml"
config:
checkpoint_enabled: true
checkpoint_frequency: 10 # Every 10 steps
checkpoint_dir: "./checkpoints/nested"
Execution Modes
- name: "execution_modes"
type: "pipeline"
pipeline_file: "./processor.yaml"
config:
# Tracing and debugging
enable_tracing: true
trace_metadata:
request_id: "{{inputs.request_id}}"
user_id: "{{inputs.user_id}}"
# Performance
cache_enabled: true # Cache pipeline definition
lazy_load: true # Load only when needed
# Retry configuration
max_retries: 2
retry_on: ["timeout", "resource_limit"]
Design Patterns
Component Library Pattern
Build a library of reusable components:
# components/validation/schema_validator.yaml
workflow:
name: "schema_validator"
description: "Validates data against JSON schema"
steps:
- name: "validate"
type: "gemini"
prompt:
- type: "static"
content: |
Validate this data against the schema:
Data: {{inputs.data}}
Schema: {{inputs.schema}}
Usage:
steps:
- name: "validate_user_data"
type: "pipeline"
pipeline_file: "./components/validation/schema_validator.yaml"
inputs:
data: "{{steps.parse.user_data}}"
schema: "{{config.user_schema}}"
Pipeline Template Pattern
Create parameterized pipeline templates:
# templates/standard_analysis.yaml
workflow:
name: "standard_analysis_template"
steps:
- name: "analyze"
type: "{{inputs.analyzer_type}}"
prompt:
- type: "file"
path: "{{inputs.prompt_path}}"
- type: "file"
path: "{{inputs.target_path}}"
Hierarchical Composition
Build complex workflows from layers:
# Top-level workflow
workflow:
name: "complete_system"
steps:
- name: "data_pipeline"
type: "pipeline"
pipeline_file: "./pipelines/data_pipeline.yaml"
- name: "analysis_pipeline"
type: "pipeline"
pipeline_file: "./pipelines/analysis_pipeline.yaml"
inputs:
data: "{{steps.data_pipeline.processed_data}}"
- name: "reporting_pipeline"
type: "pipeline"
pipeline_file: "./pipelines/reporting_pipeline.yaml"
inputs:
analysis: "{{steps.analysis_pipeline.results}}"
Factory Pattern
Dynamically select pipelines:
steps:
- name: "detect_type"
type: "gemini"
prompt:
- type: "static"
content: "Detect the project type"
- name: "run_appropriate_pipeline"
type: "pipeline"
pipeline_file: "./pipelines/{{steps.detect_type.project_type}}_analyzer.yaml"
inputs:
project_path: "./src"
Recursive Processing Pattern
Process hierarchical data structures:
workflow:
name: "tree_processor"
steps:
- name: "process_node"
type: "gemini"
prompt:
- type: "static"
content: "Process node: {{inputs.node}}"
- name: "process_children"
type: "for_loop"
iterator: "child"
data_source: "steps.process_node.children"
steps:
- name: "recurse"
type: "pipeline"
pipeline_ref: "tree_processor"
inputs:
node: "{{loop.child}}"
config:
max_depth: "{{inputs.max_depth - 1}}"
Best Practices
1. Pipeline Organization
Single Responsibility:
- Each pipeline should have one clear purpose
- Keep pipelines focused and cohesive
- Avoid mixing unrelated functionality
Naming Conventions:
- Use descriptive names:
validate_user_input.yaml
- Include action verbs:
generate_
,analyze_
,transform_
- Group by domain:
security/scan_vulnerabilities.yaml
Directory Structure:
pipelines/
├── components/ # Small, reusable units
├── workflows/ # Complete business workflows
├── templates/ # Parameterized templates
└── experimental/ # Work in progress
2. Input/Output Design
Clear Interfaces:
# Document expected inputs
workflow:
name: "data_processor"
description: "Processes raw data into structured format"
# Future: Input documentation
inputs:
raw_data:
type: "array"
description: "Raw data records to process"
format:
type: "string"
enum: ["json", "csv", "xml"]
description: "Output format"
Minimal Coupling:
- Pass only required data
- Avoid tight coupling between pipelines
- Use clear, documented interfaces
3. Error Handling
Graceful Degradation:
- name: "optional_enhancement"
type: "pipeline"
pipeline_file: "./enhance_data.yaml"
config:
continue_on_error: true
fallback_value: "{{inputs.original_data}}"
Error Context:
- name: "critical_operation"
type: "pipeline"
pipeline_file: "./critical.yaml"
on_error:
- name: "log_error"
type: "set_variable"
variables:
error_context:
pipeline: "critical_operation"
inputs: "{{inputs}}"
error: "{{error}}"
timestamp: "{{now()}}"
4. Performance Optimization
Pipeline Caching:
config:
cache_pipeline: true # Cache parsed pipeline
cache_ttl: 3600 # 1 hour TTL
Lazy Loading:
- name: "conditional_pipeline"
type: "pipeline"
pipeline_file: "./expensive_operation.yaml"
condition: "steps.check.requires_processing"
config:
lazy_load: true # Only load if condition met
Resource Pooling:
config:
reuse_workspace: true # Reuse workspace between runs
pool_size: 5 # Connection pool size
5. Testing Strategies
Isolated Testing:
# Test individual pipeline components
mix pipeline.run pipelines/components/validator.yaml \
--inputs '{"data": "test_data", "schema": "test_schema"}'
Mock Pipelines:
# Use mock pipelines for testing
- name: "test_with_mock"
type: "pipeline"
pipeline_file: "{{TEST_MODE ? './mocks/processor.yaml' : './processor.yaml'}}"
Troubleshooting
Common Issues
1. Circular Dependencies
Error: Circular dependency detected: A -> B -> C -> A
Solution: Review pipeline dependencies and remove cycles
2. Maximum Depth Exceeded
Error: Maximum nesting depth (10) exceeded
Solutions:
- Increase
max_depth
configuration - Refactor to reduce nesting
- Use iterative instead of recursive approach
3. Input Type Mismatch
Error: Expected array for 'items', got string
Solution: Ensure input types match expected types
4. Output Path Not Found
Error: Path 'results.data.value' not found in pipeline output
Solution: Verify the output structure matches extraction paths
Debugging Tools
Execution Tracing:
config:
enable_tracing: true
trace_level: "detailed" # Show all operations
Debug Output:
PIPELINE_DEBUG=true mix pipeline.run workflow.yaml
Execution Visualization:
Execution Tree:
├─ main_workflow (0ms)
│ ├─ data_prep (15ms)
│ │ └─ validator (8ms)
│ │ └─ schema_check: valid
│ ├─ processor (125ms)
│ │ ├─ transform (45ms)
│ │ └─ aggregate (80ms)
│ └─ reporter (22ms)
Performance Analysis
Metrics Collection:
config:
collect_metrics: true
metrics:
- execution_time
- memory_usage
- step_count
Bottleneck Identification:
Performance Report:
- Total Duration: 289ms
- Slowest Pipeline: data_processor (125ms)
- Memory Peak: 234MB
- Total Steps: 15
- Nesting Depth: 3
This reference provides comprehensive documentation for pipeline composition in Pipeline YAML v2 format, enabling powerful modular workflow design.