Safety Features Guide for Recursive Pipelines

View Source

Table of Contents

  1. Overview
  2. Recursion Protection
  3. Circular Dependency Detection
  4. Resource Management
  5. Memory Monitoring
  6. Timeout Protection
  7. Step Count Limits
  8. Workspace Isolation
  9. Configuration Management
  10. Monitoring and Alerts
  11. Best Practices
  12. Troubleshooting
  13. API Reference

Overview

The recursive pipeline system includes comprehensive safety mechanisms designed to prevent common failure modes in nested execution scenarios. These safety features ensure system stability, prevent resource exhaustion, and provide clear feedback when limits are reached.

Safety Architecture

graph TD
    A[Pipeline Execution Request] --> B[Safety Manager]
    B --> C{Safety Checks}
    
    C --> D[Recursion Guard]
    D --> D1[Depth Check]
    D --> D2[Circular Dependency]
    
    C --> E[Resource Monitor]
    E --> E1[Memory Limits]
    E --> E2[Step Count]
    E --> E3[Timeout Check]
    
    C --> F[Workspace Manager]
    F --> F1[Isolation]
    F --> F2[Cleanup]
    
    D1 & D2 & E1 & E2 & E3 --> G{All Checks Pass?}
    G -->|Yes| H[Execute Pipeline]
    G -->|No| I[Safety Error]
    
    style B fill:#f9f,stroke:#333,stroke-width:4px
    style G fill:#ffa,stroke:#333,stroke-width:2px

Recursion Protection

Depth Limiting

Control maximum nesting depth to prevent stack overflow and resource exhaustion:

Configuration

# config/dev.exs - Development environment
config :pipeline,
  max_nesting_depth: 15,      # More permissive for experimentation
  depth_warning_threshold: 10  # Warn at 10 levels

# config/test.exs - Test environment  
config :pipeline,
  max_nesting_depth: 5,       # Restricted for fast tests
  depth_warning_threshold: 3   # Early warnings

# config/prod.exs - Production environment
config :pipeline,
  max_nesting_depth: 8,       # Conservative for stability
  depth_warning_threshold: 5   # Warn at 5 levels

Per-Pipeline Override

# Override global depth limit for specific pipeline
- name: "deep_analysis"
  type: "pipeline"
  pipeline_file: "./analyzer.yaml"
  config:
    max_depth: 10  # Override default limit
    warn_at_depth: 7

Depth Tracking

The system maintains execution depth across the entire call chain:

# Execution context tracks depth
%Context{
  depth: 3,
  execution_chain: [
    "main_workflow",
    "data_processor", 
    "validation_pipeline",
    "schema_checker"  # Current
  ]
}

Error Messages

Clear error reporting when depth limits are exceeded:

Error: Maximum nesting depth (8) exceeded

Execution chain (9 levels):
  1. main_workflow (depth: 0)
  2.  data_preparation (depth: 1)
  3.    validation_suite (depth: 2)
  4.       schema_validation (depth: 3)
  5.          deep_validator (depth: 4)
  6.             field_checker (depth: 5)
  7.                type_validator (depth: 6)
  8.                   complex_rules (depth: 7)
  9.                      nested_check (depth: 8)  Limit exceeded

Configuration:
  - Maximum depth: 8
  - Current depth: 9
  - Environment: production

Suggestions:
  - Refactor deep nesting into flatter structure
  - Increase max_depth in configuration if needed
  - Consider breaking into multiple sequential pipelines

Circular Dependency Detection

How It Works

The system tracks the execution chain to detect when a pipeline attempts to call itself:

# RecursionGuard tracks pipeline paths
defmodule Pipeline.Safety.RecursionGuard do
  def check_circular_dependency(pipeline_id, context) do
    execution_chain = get_execution_chain(context)
    
    if pipeline_id in execution_chain do
      {:error, format_circular_error(pipeline_id, execution_chain)}
    else
      :ok
    end
  end
end

Detection Examples

Direct Circular Reference

# pipeline_a.yaml
steps:
  - name: "process"
    type: "claude"
    prompt: "Process data"
  
  - name: "recurse"
    type: "pipeline"
    pipeline_file: "./pipeline_a.yaml"  # ERROR: Self-reference

Error:

Error: Circular dependency detected
  Pipeline 'pipeline_a' is already in the execution chain
  
  Circular path:
    pipeline_a  pipeline_a

Indirect Circular Reference

# pipeline_a.yaml
- name: "call_b"
  type: "pipeline"
  pipeline_file: "./pipeline_b.yaml"

# pipeline_b.yaml
- name: "call_c"
  type: "pipeline"
  pipeline_file: "./pipeline_c.yaml"

# pipeline_c.yaml
- name: "call_a"
  type: "pipeline"
  pipeline_file: "./pipeline_a.yaml"  # ERROR: Circular through chain

Error:

Error: Circular dependency detected in pipeline execution

Execution creates a cycle:
  main  pipeline_a  pipeline_b  pipeline_c  pipeline_a
                                                   
           

Execution chain:
  1. main (depth: 0)
  2. pipeline_a (depth: 1)  First occurrence
  3. pipeline_b (depth: 2)
  4. pipeline_c (depth: 3)
  5. pipeline_a (depth: 4)  Circular reference detected

Resolution:
  - Review pipeline dependencies
  - Break circular reference
  - Consider extracting shared logic

Allowed Patterns

Self-reference is allowed with proper termination conditions:

# Recursive directory processor (allowed)
workflow:
  name: "directory_scanner"
  steps:
    - name: "scan_current"
      type: "claude"
      prompt: "List items in: {{inputs.path}}"
    
    - name: "process_subdirs"
      type: "for_loop"
      over: "{{steps.scan_current.subdirectories}}"
      as: "subdir"
      steps:
        - name: "recurse"
          type: "pipeline"
          pipeline_ref: "directory_scanner"  # Self-reference
          condition: "{{context.depth < 5}}"  # Termination condition
          inputs:
            path: "{{subdir.path}}"

Resource Management

Memory Monitoring

Real-time memory usage tracking with configurable limits:

Configuration

# Global configuration
config :pipeline,
  default_memory_limit_mb: 1024,      # 1GB default
  memory_check_interval_ms: 1000,     # Check every second
  memory_warning_percentage: 75       # Warn at 75% usage

Per-Pipeline Limits

- name: "memory_intensive"
  type: "pipeline"
  pipeline_file: "./processor.yaml"
  config:
    memory_limit_mb: 2048        # 2GB limit
    memory_warning_mb: 1536      # Warn at 1.5GB
    memory_check_enabled: true

Memory Pressure Handling

The system monitors memory usage and provides warnings:

Warning: High memory usage detected
  Pipeline: data_processor (depth: 2)
  Current usage: 850MB / 1024MB (83%)
  Parent pipeline: main_workflow
  
  Memory breakdown:
    - Context data: 450MB
    - Step results: 300MB  
    - Workspace files: 100MB
    
  Recommendations:
    - Consider streaming large datasets
    - Clear intermediate results after use
    - Increase memory limit if needed

Automatic Cleanup

Configure automatic cleanup of large data:

config:
  auto_cleanup:
    enabled: true
    threshold_mb: 100           # Clean results > 100MB
    keep_final_result: true     # Preserve final output
    cleanup_on_success: true    # Clean intermediate data

Resource Pooling

Efficient resource management through pooling:

# Resource pool configuration
config :pipeline,
  resource_pools:
    context_pool:
      size: 50
      overflow: 10
      strategy: :lifo
    workspace_pool:
      size: 20
      overflow: 5
      cleanup_on_return: true

Memory Monitoring

Memory Usage Tracking

The ResourceMonitor module tracks memory usage across all nested pipelines:

# Memory tracking structure
%MemoryUsage{
  current_mb: 456.7,
  peak_mb: 512.3,
  limit_mb: 1024,
  breakdown: %{
    contexts: 234.5,
    results: 189.2,
    workspaces: 33.0
  },
  history: [
    {~U[2025-01-03 12:00:00Z], 123.4},
    {~U[2025-01-03 12:00:01Z], 234.5},
    # ...
  ]
}

Memory Limits and Actions

Define actions when memory limits are approached:

config:
  memory_limits:
    soft_limit_mb: 768         # Warning threshold
    hard_limit_mb: 1024        # Execution stops
    
  memory_actions:
    at_50_percent:
      - log_warning
    at_75_percent:
      - log_warning
      - trigger_gc
      - clear_caches
    at_90_percent:
      - log_error
      - pause_execution
      - await_cleanup
    at_100_percent:
      - terminate_execution
      - cleanup_all
      - return_error

Memory Optimization Strategies

# 1. Streaming Results
- name: "process_large_data"
  type: "pipeline"
  pipeline_file: "./stream_processor.yaml"
  config:
    stream_results: true
    chunk_size_mb: 10
    
# 2. Result Pruning
- name: "analyze"
  type: "pipeline"
  pipeline_file: "./analyzer.yaml"
  config:
    prune_results:
      keep_fields: ["summary", "score"]
      remove_fields: ["raw_data", "intermediate"]
      
# 3. Garbage Collection
- name: "memory_intensive"
  type: "pipeline"
  pipeline_file: "./processor.yaml"
  config:
    gc_after_steps: ["large_transform", "aggregation"]
    gc_threshold_mb: 500

Timeout Protection

Execution Time Limits

Prevent runaway pipelines with configurable timeouts:

Global Configuration

config :pipeline,
  default_timeout_seconds: 300,        # 5 minutes default
  timeout_check_interval_ms: 1000,     # Check every second
  timeout_grace_period_seconds: 10     # Grace period for cleanup

Pipeline-Specific Timeouts

- name: "long_running_analysis"
  type: "pipeline"
  pipeline_file: "./analysis.yaml"
  config:
    timeout_seconds: 600        # 10 minutes
    timeout_warning_seconds: 480 # Warn at 8 minutes
    on_timeout: "graceful"      # graceful | immediate | retry

Timeout Handling Strategies

Graceful Timeout

config:
  timeout_seconds: 300
  on_timeout: "graceful"
  graceful_timeout:
    save_checkpoint: true
    cleanup_resources: true
    return_partial_results: true

Immediate Timeout

config:
  timeout_seconds: 300
  on_timeout: "immediate"
  immediate_timeout:
    force_stop: true
    skip_cleanup: false  # Still attempt cleanup

Retry on Timeout

config:
  timeout_seconds: 300
  on_timeout: "retry"
  retry_config:
    max_attempts: 3
    backoff_factor: 2
    increase_timeout: true  # Double timeout on retry

Timeout Error Messages

Error: Pipeline execution timeout

Details:
  Pipeline: complex_analysis (depth: 2)
  Elapsed time: 301.5 seconds
  Timeout limit: 300 seconds
  
Execution progress:
  - Completed steps: 45/60 (75%)
  - Current step: deep_learning_analysis
  - Step duration: 125.3 seconds
  
Partial results saved: true
Checkpoint available: true
  
Recovery options:
  1. Resume from checkpoint with --resume flag
  2. Increase timeout in configuration
  3. Optimize slow steps identified above

Step Count Limits

Preventing Exponential Growth

Limit total steps across all nested pipelines:

config :pipeline,
  max_total_steps: 1000,              # Total across all pipelines
  step_warning_threshold: 750,         # Warn at 75%
  count_nested_steps: true            # Include nested pipeline steps

Step Counting Logic

# Steps are counted across all nesting levels
def count_total_steps(pipeline, context) do
  direct_steps = length(pipeline.steps)
  
  nested_steps = Enum.sum(
    for step <- pipeline.steps,
        step.type == "pipeline" do
      count_nested_pipeline_steps(step, context)
    end
  )
  
  direct_steps + nested_steps
end

Step Count Monitoring

# Monitor step explosion in complex pipelines
- name: "analyze_all_files"
  type: "for_loop"
  over: "{{file_list}}"  # 100 files
  steps:
    - name: "process_file"
      type: "pipeline"
      pipeline_file: "./file_processor.yaml"  # 20 steps each
      config:
        max_total_steps: 2500  # Override for this use case

Warning output:

Warning: Approaching step count limit
  Current steps: 1,750 / 2,000 (87.5%)
  
  Step breakdown by pipeline:
    - main_workflow: 5 steps
    - file_processor (x87): 1,740 steps
    - cleanup_pipeline: 5 steps
    
  Largest contributors:
    1. file_processor.deep_analysis: 522 steps (30%)
    2. file_processor.validation: 348 steps (20%)
    3. file_processor.transformation: 261 steps (15%)

Workspace Isolation

Isolated Execution Environments

Each nested pipeline gets its own workspace:

config:
  workspace_isolation:
    enabled: true
    base_dir: "./workspaces"
    naming_pattern: "${pipeline_id}_${timestamp}"
    permissions: "0750"

Workspace Structure

workspaces/
 main_workflow_1704288000/
    .metadata.json
    inputs/
    outputs/
    temp/
 nested_processor_1704288030/
    .metadata.json
    inputs/
    outputs/
    temp/

Workspace Lifecycle

# Workspace configuration
- name: "isolated_execution"
  type: "pipeline"
  pipeline_file: "./processor.yaml"
  config:
    workspace:
      create_on_start: true
      cleanup_on_success: true
      cleanup_on_error: false  # Keep for debugging
      preserve_outputs: true   # Copy outputs before cleanup
      max_size_mb: 500        # Limit workspace size

Workspace Security

# Security measures for workspace isolation
config :pipeline,
  workspace_security:
    enforce_isolation: true,
    prevent_traversal: true,
    sandbox_commands: true,
    allowed_paths: [
      "./workspaces/",
      "./shared/resources/"
    ],
    blocked_paths: [
      "/etc/",
      "/usr/",
      "~/"
    ]

Configuration Management

Environment-Specific Configuration

# config/dev.exs
config :pipeline, :safety,
  recursion:
    max_depth: 15,
    warning_depth: 10
  memory:
    limit_mb: 2048,
    warning_percent: 70
  timeout:
    default_seconds: 600,
    warning_seconds: 480
  development_mode: true,
  relaxed_limits: true

# config/prod.exs  
config :pipeline, :safety,
  recursion:
    max_depth: 8,
    warning_depth: 5
  memory:
    limit_mb: 1024,
    warning_percent: 75
  timeout:
    default_seconds: 300,
    warning_seconds: 240
  production_mode: true,
  strict_enforcement: true

Runtime Configuration

Override safety settings at runtime:

# Via environment variables
PIPELINE_MAX_DEPTH=10 mix pipeline.run workflow.yaml

# Via CLI flags
mix pipeline.run workflow.yaml \
  --max-depth 10 \
  --memory-limit 2048 \
  --timeout 600

Configuration Validation

The system validates safety configurations:

# Invalid configuration detected
Error: Invalid safety configuration
  
  Issues found:
    - max_depth (3) is less than minimum (5)
    - memory_limit (100MB) is below minimum (256MB)
    - timeout (30s) is below minimum (60s)
    
  Please adjust configuration to meet minimum safety requirements

Monitoring and Alerts

Telemetry Integration

# Attach to safety-related telemetry events
:telemetry.attach_many(
  "pipeline-safety-monitoring",
  [
    [:pipeline, :safety, :limit_warning],
    [:pipeline, :safety, :limit_exceeded],
    [:pipeline, :safety, :resource_pressure],
    [:pipeline, :safety, :check_performed]
  ],
  &handle_safety_event/4,
  nil
)

Safety Metrics

Export safety metrics for monitoring:

# Prometheus metrics
pipeline_safety_checks_total{type="recursion", result="pass"} 1523
pipeline_safety_checks_total{type="recursion", result="fail"} 12
pipeline_safety_limit_warnings{type="memory", level="75_percent"} 45
pipeline_safety_limit_exceeded{type="timeout"} 3
pipeline_max_depth_reached{pipeline="analyzer"} 7
pipeline_memory_usage_bytes{pipeline="processor", percentile="95"} 892739584

Alert Configuration

# Alert when safety limits are approached
alerts:
  - name: "high_memory_usage"
    condition: "memory_usage_percent > 80"
    severity: "warning"
    actions:
      - log_warning
      - send_notification
      
  - name: "depth_limit_approached"
    condition: "current_depth > max_depth * 0.8"
    severity: "warning"
    actions:
      - log_warning
      - add_metric
      
  - name: "timeout_imminent"
    condition: "elapsed_time > timeout * 0.9"
    severity: "critical"
    actions:
      - log_error
      - trigger_graceful_shutdown
      - notify_operators

Best Practices

1. Configure Appropriate Limits

Set limits based on your use case:

# Data processing pipeline - higher memory, longer timeout
config:
  memory_limit_mb: 4096
  timeout_seconds: 1800  # 30 minutes
  max_depth: 5          # Keep shallow

# Quick validation pipeline - lower limits
config:
  memory_limit_mb: 512
  timeout_seconds: 60   # 1 minute
  max_depth: 3         # Very shallow

2. Design for Safety

Structure pipelines with safety in mind:

# Good: Flat structure with sequential processing
workflow:
  steps:
    - name: "validate"
      type: "pipeline"
      pipeline_file: "./validate.yaml"
    - name: "process"
      type: "pipeline"
      pipeline_file: "./process.yaml"
    - name: "report"
      type: "pipeline"
      pipeline_file: "./report.yaml"

# Avoid: Deep nesting without clear need
workflow:
  steps:
    - name: "level1"
      type: "pipeline"
      pipeline:
        steps:
          - name: "level2"
            type: "pipeline"
            pipeline:
              steps:
                - name: "level3"
                  type: "pipeline"
                  # ... continues deeper

3. Monitor Resource Usage

Add monitoring steps:

- name: "check_resources"
  type: "claude"
  prompt: |
    Check current resource usage:
    - Memory: {{system.memory_usage_mb}}MB / {{config.memory_limit_mb}}MB
    - Steps: {{system.total_steps}} / {{config.max_total_steps}}
    - Depth: {{context.depth}} / {{config.max_depth}}
    - Time: {{system.elapsed_seconds}}s / {{config.timeout_seconds}}s

4. Implement Graceful Degradation

# Primary processing with fallback
- name: "primary_analysis"
  type: "pipeline"
  pipeline_file: "./full_analysis.yaml"
  config:
    memory_limit_mb: 1024
    continue_on_error: true
    
- name: "fallback_analysis"
  type: "pipeline"
  pipeline_file: "./lightweight_analysis.yaml"
  condition: "{{steps.primary_analysis.error == 'memory_limit_exceeded'}}"
  config:
    memory_limit_mb: 256  # Much lower requirement

5. Test Safety Limits

Include safety limit tests:

# test_safety_limits.yaml
tests:
  - name: "test_max_depth"
    pipeline: "./deep_pipeline.yaml"
    expect_error: "max_depth_exceeded"
    
  - name: "test_memory_limit"
    pipeline: "./memory_intensive.yaml"
    config:
      memory_limit_mb: 100  # Artificially low
    expect_error: "memory_limit_exceeded"
    
  - name: "test_timeout"
    pipeline: "./slow_pipeline.yaml"
    config:
      timeout_seconds: 1
    expect_error: "timeout_exceeded"

Troubleshooting

Common Safety Issues

1. Unexpected Depth Limit Errors

Problem: Pipeline fails with depth limit when not expected

Diagnosis:

- name: "debug_depth"
  type: "claude"  
  prompt: |
    Current execution chain:
    {{json(context.execution_chain)}}
    Current depth: {{context.depth}}
    Max depth: {{config.max_depth}}

Solutions:

  • Review execution chain for unexpected nesting
  • Check for accidental recursive calls
  • Increase depth limit if legitimate

2. Memory Pressure Issues

Problem: Frequent memory warnings or failures

Diagnosis:

# Enable detailed memory profiling
config :pipeline,
  memory_profiling:
    enabled: true
    sample_interval_ms: 100
    track_allocations: true

Solutions:

  • Identify large data structures
  • Implement streaming for large datasets
  • Clear intermediate results
  • Increase memory limits

3. Timeout Issues

Problem: Pipelines timing out unexpectedly

Diagnosis:

# Add timing instrumentation
- name: "measure_step_time"
  type: "claude"
  prompt: "Starting expensive operation"
  metadata:
    timer_start: "expensive_op"
    
- name: "expensive_operation"
  type: "pipeline"
  pipeline_file: "./processor.yaml"
  
- name: "report_time"
  type: "claude"
  prompt: |
    Operation took: {{timer.expensive_op.elapsed_ms}}ms

Solutions:

  • Profile slow steps
  • Implement caching
  • Parallelize where possible
  • Increase timeout appropriately

Debug Mode

Enable comprehensive safety debugging:

# Run with safety debug mode
PIPELINE_SAFETY_DEBUG=true mix pipeline.run workflow.yaml

# Debug output includes:
# - All safety checks performed
# - Resource usage at each step
# - Limit approaching warnings
# - Configuration validation
# - Execution chain visualization

API Reference

Safety Manager API

# Main safety check interface
Pipeline.Safety.SafetyManager.check_safety(pipeline_id, context, config)
# Returns: :ok | {:error, reason}

# Individual safety checks
Pipeline.Safety.RecursionGuard.check_depth(context, max_depth)
Pipeline.Safety.RecursionGuard.check_circular_dependency(pipeline_id, context)
Pipeline.Safety.ResourceMonitor.check_memory(current_usage, limit)
Pipeline.Safety.ResourceMonitor.check_timeout(start_time, timeout)
Pipeline.Safety.ResourceMonitor.check_step_count(count, limit)

# Resource tracking
Pipeline.Safety.ResourceMonitor.start_monitoring(context)
Pipeline.Safety.ResourceMonitor.stop_monitoring(context)
Pipeline.Safety.ResourceMonitor.get_current_usage(context)

# Configuration
Pipeline.Safety.Config.get_limits(context)
Pipeline.Safety.Config.validate_config(config)

Configuration Schema

# Safety configuration structure
%{
  recursion: %{
    max_depth: integer(),
    warning_depth: integer(),
    track_chain: boolean()
  },
  memory: %{
    limit_mb: integer(),
    warning_percent: integer(),
    check_interval_ms: integer(),
    auto_cleanup: boolean()
  },
  timeout: %{
    default_seconds: integer(),
    warning_seconds: integer(),
    grace_period_seconds: integer(),
    on_timeout: :graceful | :immediate | :retry
  },
  steps: %{
    max_total: integer(),
    warning_threshold: integer(),
    count_nested: boolean()
  },
  workspace: %{
    isolation_enabled: boolean(),
    cleanup_on_success: boolean(),
    cleanup_on_error: boolean(),
    max_size_mb: integer()
  }
}

Telemetry Events

# Safety-related telemetry events
[:pipeline, :safety, :check_started]
# Metadata: %{check_type: atom(), pipeline_id: string(), context: map()}

[:pipeline, :safety, :check_completed]  
# Metadata: %{check_type: atom(), result: :ok | :error, duration_μs: integer()}

[:pipeline, :safety, :limit_warning]
# Metadata: %{limit_type: atom(), current: number(), limit: number(), percent: float()}

[:pipeline, :safety, :limit_exceeded]
# Metadata: %{limit_type: atom(), current: number(), limit: number(), pipeline_id: string()}

[:pipeline, :safety, :resource_cleaned]
# Metadata: %{resource_type: atom(), size_bytes: integer(), pipeline_id: string()}

This guide provides comprehensive documentation for the safety features in recursive pipelines. These mechanisms ensure stable, predictable execution while providing clear feedback when limits are approached or exceeded.