Pipeline Composition Reference

View Source

Table of Contents

  1. Overview
  2. Pipeline Step Type
  3. Input Mapping
  4. Output Extraction
  5. Context Management
  6. Safety Features
  7. Execution Configuration
  8. Design Patterns
  9. Best Practices
  10. Troubleshooting

Overview

Pipeline composition enables building complex workflows from smaller, reusable pipeline components. This modular approach provides:

  • Reusability: Share common workflows across projects
  • Maintainability: Update shared logic in one place
  • Testability: Test pipeline components in isolation
  • Scalability: Build arbitrarily complex workflows
  • Organization: Separate concerns into logical units

Pipeline Step Type

The pipeline step type executes another complete pipeline as a single step:

- name: "data_processing"
  type: "pipeline"
  
  # Pipeline source (one required)
  pipeline_file: "./pipelines/data_processor.yaml"    # External file
  pipeline_ref: "common/data_processor"               # Registry (future)
  pipeline:                                           # Inline definition
    name: "inline_processor"
    steps:
      - name: "process"
        type: "gemini"
        prompt:
          - type: "static"
            content: "Process data"

External Pipeline Files

Most common approach - reference external YAML files:

steps:
  - name: "security_scan"
    type: "pipeline"
    pipeline_file: "./pipelines/security/vulnerability_scanner.yaml"
    inputs:
      target_directory: "./src"
      scan_depth: "comprehensive"
    outputs:
      - "vulnerabilities"
      - "security_score"

File Organization:

pipelines/
 components/           # Reusable components
    data/
       validator.yaml
       transformer.yaml
       aggregator.yaml
    analysis/
       code_review.yaml
       security_scan.yaml
       performance_check.yaml
    generation/
        test_generator.yaml
        doc_generator.yaml
 workflows/           # Complete workflows
    full_analysis.yaml
    deployment.yaml
 templates/          # Pipeline templates
     standard_review.yaml

Inline Pipeline Definition

Define pipelines directly within the parent:

- name: "quick_validation"
  type: "pipeline"
  pipeline:
    name: "inline_validator"
    steps:
      - name: "syntax_check"
        type: "gemini"
        prompt:
          - type: "static"
            content: "Check syntax validity"
          - type: "static"
            content: "{{inputs.code}}"
      
      - name: "semantic_check"
        type: "gemini"
        prompt:
          - type: "static"
            content: "Verify semantic correctness"
          - type: "previous_response"
            step: "syntax_check"

Pipeline Registry (Future)

Reference pipelines from a central registry:

- name: "standard_security_scan"
  type: "pipeline"
  pipeline_ref: "security/owasp_top_10_scan"
  version: "2.1.0"
  inputs:
    target: "{{workspace.path}}"

Input Mapping

Pass data from parent to child pipeline:

- name: "analyze_code"
  type: "pipeline"
  pipeline_file: "./pipelines/code_analyzer.yaml"
  
  inputs:
    # Direct values
    language: "python"
    framework: "django"
    
    # From previous steps
    source_code: "{{steps.extract.code}}"
    requirements: "{{steps.parse.requirements}}"
    
    # From workflow context
    project_name: "{{workflow.project_name}}"
    environment: "{{workflow.environment}}"
    
    # From state variables
    analysis_config: "{{state.config}}"
    
    # Complex expressions
    threshold: "{{state.base_threshold * 1.5}}"
    
    # Arrays and objects
    files_to_analyze: "{{steps.scan.python_files}}"
    options:
      depth: "comprehensive"
      include_tests: true
      metrics: ["complexity", "coverage", "quality"]

Type Preservation

Single template references preserve their original type:

inputs:
  # Preserves integer type
  max_iterations: "{{config.iterations}}"        # → 10
  
  # Preserves object type  
  settings: "{{steps.load.config}}"              # → {"timeout": 30, "retry": 3}
  
  # Preserves array type
  items: "{{steps.gather.results}}"              # → ["a", "b", "c"]
  
  # String concatenation forces string type
  message: "Count: {{config.iterations}}"        # → "Count: 10"

Input Validation

Child pipelines can validate inputs:

# child_pipeline.yaml
workflow:
  name: "validated_processor"
  
  # Input schema (future)
  input_schema:
    type: "object"
    required: ["data", "format"]
    properties:
      data:
        type: "array"
        minItems: 1
      format:
        type: "string"
        enum: ["json", "csv", "xml"]
  
  steps:
    - name: "process"
      type: "gemini"
      prompt:
        - type: "static"
          content: "Process {{inputs.data}} in {{inputs.format}} format"

Output Extraction

Extract specific results from nested pipelines:

- name: "run_analysis"
  type: "pipeline"
  pipeline_file: "./analyzer.yaml"
  
  outputs:
    # Simple extraction - gets steps.{name}.result
    - "final_report"
    
    # Path-based extraction
    - path: "metrics.security.score"
      as: "security_score"
    
    - path: "analysis.vulnerabilities"
      as: "vulnerabilities_list"
    
    - path: "summary.recommendations[0]"
      as: "top_recommendation"

Output Patterns

Simple Extraction:

outputs:
  - "step_name"    # Extracts steps.step_name.result

Path Extraction:

outputs:
  - path: "data.items[0].value"
    as: "first_value"
  
  - path: "metrics.performance.response_time"
    as: "response_time"

Multiple Extractions:

outputs:
  - path: "analysis.score"
    as: "quality_score"
  
  - path: "analysis.issues"
    as: "found_issues"
  
  - path: "recommendations"
    as: "improvement_suggestions"

Using Extracted Outputs

steps:
  - name: "security_pipeline"
    type: "pipeline"
    pipeline_file: "./security_scan.yaml"
    outputs:
      - path: "scan.vulnerabilities"
        as: "vulns"
      - path: "scan.risk_score"
        as: "risk"
  
  - name: "process_results"
    type: "gemini"
    prompt:
      - type: "static"
        content: |
          Risk Score: {{steps.security_pipeline.risk}}
          Vulnerabilities Found: {{length(steps.security_pipeline.vulns)}}
          
          Details:
      - type: "static"
        content: "{{steps.security_pipeline.vulns}}"

Context Management

Context Inheritance

Control what context is passed to child pipelines:

- name: "child_pipeline"
  type: "pipeline"
  pipeline_file: "./child.yaml"
  
  config:
    inherit_context: true        # Pass all parent context
    inherit_providers: true      # Inherit API configurations
    inherit_functions: false     # Don't inherit function definitions

Inheritance Options:

  • inherit_context: Share variables and state
  • inherit_providers: Share API keys and settings
  • inherit_functions: Share Gemini function definitions

Context Isolation

Each pipeline maintains its own context:

# Parent pipeline
workflow:
  name: "parent"
  variables:
    shared_var: "parent_value"
    private_var: "parent_only"
  
  steps:
    - name: "child_isolated"
      type: "pipeline"
      pipeline_file: "./child.yaml"
      config:
        inherit_context: false    # Complete isolation
      inputs:
        # Explicitly pass needed values
        needed_var: "{{variables.shared_var}}"

Variable Resolution

Variable resolution follows a hierarchical order:

  1. Child pipeline's local variables
  2. Child pipeline's inputs
  3. Inherited parent context (if enabled)
  4. Parent's parent context (recursive)
# Variable resolution example
- name: "nested_pipeline"
  type: "pipeline"
  pipeline_file: "./nested.yaml"
  config:
    inherit_context: true
  inputs:
    # These override any inherited values
    override_var: "child_specific_value"

Safety Features

Recursion Protection

Prevent infinite recursion and circular dependencies:

- name: "recursive_pipeline"
  type: "pipeline"
  pipeline_file: "./processor.yaml"
  
  config:
    max_depth: 5                 # Maximum nesting depth

Circular Dependency Detection:

# pipeline_a.yaml
- name: "call_b"
  type: "pipeline"
  pipeline_file: "./pipeline_b.yaml"

# pipeline_b.yaml
- name: "call_a"
  type: "pipeline"
  pipeline_file: "./pipeline_a.yaml"  # ERROR: Circular dependency!

Resource Limits

Control resource usage:

- name: "resource_limited"
  type: "pipeline"
  pipeline_file: "./heavy_processor.yaml"
  
  config:
    memory_limit_mb: 1024        # 1GB memory limit
    timeout_seconds: 300         # 5 minute timeout
    max_total_steps: 100         # Step count limit

Error Boundaries

Isolate errors within child pipelines:

- name: "error_isolated"
  type: "pipeline"
  pipeline_file: "./risky_operation.yaml"
  
  config:
    continue_on_error: true      # Don't fail parent
    capture_errors: true         # Store error details
  
  # Handle errors in parent
  error_handler:
    - name: "handle_child_error"
      type: "gemini"
      condition: "steps.error_isolated.error != null"
      prompt:
        - type: "static"
          content: "Child pipeline failed: {{steps.error_isolated.error}}"

Execution Configuration

Workspace Management

Each pipeline can have its own workspace:

- name: "isolated_workspace"
  type: "pipeline"
  pipeline_file: "./file_processor.yaml"
  
  config:
    workspace_dir: "./nested/{{step.name}}"
    cleanup_on_success: true     # Remove after completion
    cleanup_on_error: false      # Keep for debugging

Checkpointing

Nested checkpoint configuration:

- name: "checkpointed_pipeline"
  type: "pipeline"
  pipeline_file: "./long_running.yaml"
  
  config:
    checkpoint_enabled: true
    checkpoint_frequency: 10     # Every 10 steps
    checkpoint_dir: "./checkpoints/nested"

Execution Modes

- name: "execution_modes"
  type: "pipeline"
  pipeline_file: "./processor.yaml"
  
  config:
    # Tracing and debugging
    enable_tracing: true
    trace_metadata:
      request_id: "{{inputs.request_id}}"
      user_id: "{{inputs.user_id}}"
    
    # Performance
    cache_enabled: true          # Cache pipeline definition
    lazy_load: true              # Load only when needed
    
    # Retry configuration
    max_retries: 2
    retry_on: ["timeout", "resource_limit"]

Design Patterns

Component Library Pattern

Build a library of reusable components:

# components/validation/schema_validator.yaml
workflow:
  name: "schema_validator"
  description: "Validates data against JSON schema"
  
  steps:
    - name: "validate"
      type: "gemini"
      prompt:
        - type: "static"
          content: |
            Validate this data against the schema:
            Data: {{inputs.data}}
            Schema: {{inputs.schema}}

Usage:

steps:
  - name: "validate_user_data"
    type: "pipeline"
    pipeline_file: "./components/validation/schema_validator.yaml"
    inputs:
      data: "{{steps.parse.user_data}}"
      schema: "{{config.user_schema}}"

Pipeline Template Pattern

Create parameterized pipeline templates:

# templates/standard_analysis.yaml
workflow:
  name: "standard_analysis_template"
  
  steps:
    - name: "analyze"
      type: "{{inputs.analyzer_type}}"
      prompt:
        - type: "file"
          path: "{{inputs.prompt_path}}"
        - type: "file"
          path: "{{inputs.target_path}}"

Hierarchical Composition

Build complex workflows from layers:

# Top-level workflow
workflow:
  name: "complete_system"
  
  steps:
    - name: "data_pipeline"
      type: "pipeline"
      pipeline_file: "./pipelines/data_pipeline.yaml"
      
    - name: "analysis_pipeline"
      type: "pipeline"
      pipeline_file: "./pipelines/analysis_pipeline.yaml"
      inputs:
        data: "{{steps.data_pipeline.processed_data}}"
    
    - name: "reporting_pipeline"
      type: "pipeline"
      pipeline_file: "./pipelines/reporting_pipeline.yaml"
      inputs:
        analysis: "{{steps.analysis_pipeline.results}}"

Factory Pattern

Dynamically select pipelines:

steps:
  - name: "detect_type"
    type: "gemini"
    prompt:
      - type: "static"
        content: "Detect the project type"
  
  - name: "run_appropriate_pipeline"
    type: "pipeline"
    pipeline_file: "./pipelines/{{steps.detect_type.project_type}}_analyzer.yaml"
    inputs:
      project_path: "./src"

Recursive Processing Pattern

Process hierarchical data structures:

workflow:
  name: "tree_processor"
  
  steps:
    - name: "process_node"
      type: "gemini"
      prompt:
        - type: "static"
          content: "Process node: {{inputs.node}}"
    
    - name: "process_children"
      type: "for_loop"
      iterator: "child"
      data_source: "steps.process_node.children"
      
      steps:
        - name: "recurse"
          type: "pipeline"
          pipeline_ref: "tree_processor"
          inputs:
            node: "{{loop.child}}"
          config:
            max_depth: "{{inputs.max_depth - 1}}"

Best Practices

1. Pipeline Organization

Single Responsibility:

  • Each pipeline should have one clear purpose
  • Keep pipelines focused and cohesive
  • Avoid mixing unrelated functionality

Naming Conventions:

  • Use descriptive names: validate_user_input.yaml
  • Include action verbs: generate_, analyze_, transform_
  • Group by domain: security/scan_vulnerabilities.yaml

Directory Structure:

pipelines/
 components/      # Small, reusable units
 workflows/       # Complete business workflows
 templates/       # Parameterized templates
 experimental/    # Work in progress

2. Input/Output Design

Clear Interfaces:

# Document expected inputs
workflow:
  name: "data_processor"
  description: "Processes raw data into structured format"
  
  # Future: Input documentation
  inputs:
    raw_data:
      type: "array"
      description: "Raw data records to process"
    format:
      type: "string"
      enum: ["json", "csv", "xml"]
      description: "Output format"

Minimal Coupling:

  • Pass only required data
  • Avoid tight coupling between pipelines
  • Use clear, documented interfaces

3. Error Handling

Graceful Degradation:

- name: "optional_enhancement"
  type: "pipeline"
  pipeline_file: "./enhance_data.yaml"
  config:
    continue_on_error: true
    fallback_value: "{{inputs.original_data}}"

Error Context:

- name: "critical_operation"
  type: "pipeline"
  pipeline_file: "./critical.yaml"
  
  on_error:
    - name: "log_error"
      type: "set_variable"
      variables:
        error_context:
          pipeline: "critical_operation"
          inputs: "{{inputs}}"
          error: "{{error}}"
          timestamp: "{{now()}}"

4. Performance Optimization

Pipeline Caching:

config:
  cache_pipeline: true           # Cache parsed pipeline
  cache_ttl: 3600               # 1 hour TTL

Lazy Loading:

- name: "conditional_pipeline"
  type: "pipeline"
  pipeline_file: "./expensive_operation.yaml"
  condition: "steps.check.requires_processing"
  config:
    lazy_load: true             # Only load if condition met

Resource Pooling:

config:
  reuse_workspace: true         # Reuse workspace between runs
  pool_size: 5                  # Connection pool size

5. Testing Strategies

Isolated Testing:

# Test individual pipeline components
mix pipeline.run pipelines/components/validator.yaml \
  --inputs '{"data": "test_data", "schema": "test_schema"}'

Mock Pipelines:

# Use mock pipelines for testing
- name: "test_with_mock"
  type: "pipeline"
  pipeline_file: "{{TEST_MODE ? './mocks/processor.yaml' : './processor.yaml'}}"

Troubleshooting

Common Issues

1. Circular Dependencies

Error: Circular dependency detected: A -> B -> C -> A

Solution: Review pipeline dependencies and remove cycles

2. Maximum Depth Exceeded

Error: Maximum nesting depth (10) exceeded

Solutions:

  • Increase max_depth configuration
  • Refactor to reduce nesting
  • Use iterative instead of recursive approach

3. Input Type Mismatch

Error: Expected array for 'items', got string

Solution: Ensure input types match expected types

4. Output Path Not Found

Error: Path 'results.data.value' not found in pipeline output

Solution: Verify the output structure matches extraction paths

Debugging Tools

Execution Tracing:

config:
  enable_tracing: true
  trace_level: "detailed"       # Show all operations

Debug Output:

PIPELINE_DEBUG=true mix pipeline.run workflow.yaml

Execution Visualization:

Execution Tree:
 main_workflow (0ms)
   data_prep (15ms)
     validator (8ms)
        schema_check: valid
   processor (125ms)
     transform (45ms)
     aggregate (80ms)
   reporter (22ms)

Performance Analysis

Metrics Collection:

config:
  collect_metrics: true
  metrics:
    - execution_time
    - memory_usage
    - step_count

Bottleneck Identification:

Performance Report:
- Total Duration: 289ms
- Slowest Pipeline: data_processor (125ms)
- Memory Peak: 234MB
- Total Steps: 15
- Nesting Depth: 3

This reference provides comprehensive documentation for pipeline composition in Pipeline YAML v2 format, enabling powerful modular workflow design.