Safety Features Guide for Recursive Pipelines
View SourceTable of Contents
- Overview
- Recursion Protection
- Circular Dependency Detection
- Resource Management
- Memory Monitoring
- Timeout Protection
- Step Count Limits
- Workspace Isolation
- Configuration Management
- Monitoring and Alerts
- Best Practices
- Troubleshooting
- API Reference
Overview
The recursive pipeline system includes comprehensive safety mechanisms designed to prevent common failure modes in nested execution scenarios. These safety features ensure system stability, prevent resource exhaustion, and provide clear feedback when limits are reached.
Safety Architecture
graph TD
A[Pipeline Execution Request] --> B[Safety Manager]
B --> C{Safety Checks}
C --> D[Recursion Guard]
D --> D1[Depth Check]
D --> D2[Circular Dependency]
C --> E[Resource Monitor]
E --> E1[Memory Limits]
E --> E2[Step Count]
E --> E3[Timeout Check]
C --> F[Workspace Manager]
F --> F1[Isolation]
F --> F2[Cleanup]
D1 & D2 & E1 & E2 & E3 --> G{All Checks Pass?}
G -->|Yes| H[Execute Pipeline]
G -->|No| I[Safety Error]
style B fill:#f9f,stroke:#333,stroke-width:4px
style G fill:#ffa,stroke:#333,stroke-width:2px
Recursion Protection
Depth Limiting
Control maximum nesting depth to prevent stack overflow and resource exhaustion:
Configuration
# config/dev.exs - Development environment
config :pipeline,
max_nesting_depth: 15, # More permissive for experimentation
depth_warning_threshold: 10 # Warn at 10 levels
# config/test.exs - Test environment
config :pipeline,
max_nesting_depth: 5, # Restricted for fast tests
depth_warning_threshold: 3 # Early warnings
# config/prod.exs - Production environment
config :pipeline,
max_nesting_depth: 8, # Conservative for stability
depth_warning_threshold: 5 # Warn at 5 levels
Per-Pipeline Override
# Override global depth limit for specific pipeline
- name: "deep_analysis"
type: "pipeline"
pipeline_file: "./analyzer.yaml"
config:
max_depth: 10 # Override default limit
warn_at_depth: 7
Depth Tracking
The system maintains execution depth across the entire call chain:
# Execution context tracks depth
%Context{
depth: 3,
execution_chain: [
"main_workflow",
"data_processor",
"validation_pipeline",
"schema_checker" # Current
]
}
Error Messages
Clear error reporting when depth limits are exceeded:
Error: Maximum nesting depth (8) exceeded
Execution chain (9 levels):
1. main_workflow (depth: 0)
2. ├─ data_preparation (depth: 1)
3. │ └─ validation_suite (depth: 2)
4. │ └─ schema_validation (depth: 3)
5. │ └─ deep_validator (depth: 4)
6. │ └─ field_checker (depth: 5)
7. │ └─ type_validator (depth: 6)
8. │ └─ complex_rules (depth: 7)
9. │ └─ nested_check (depth: 8) ← Limit exceeded
Configuration:
- Maximum depth: 8
- Current depth: 9
- Environment: production
Suggestions:
- Refactor deep nesting into flatter structure
- Increase max_depth in configuration if needed
- Consider breaking into multiple sequential pipelines
Circular Dependency Detection
How It Works
The system tracks the execution chain to detect when a pipeline attempts to call itself:
# RecursionGuard tracks pipeline paths
defmodule Pipeline.Safety.RecursionGuard do
def check_circular_dependency(pipeline_id, context) do
execution_chain = get_execution_chain(context)
if pipeline_id in execution_chain do
{:error, format_circular_error(pipeline_id, execution_chain)}
else
:ok
end
end
end
Detection Examples
Direct Circular Reference
# pipeline_a.yaml
steps:
- name: "process"
type: "claude"
prompt: "Process data"
- name: "recurse"
type: "pipeline"
pipeline_file: "./pipeline_a.yaml" # ERROR: Self-reference
Error:
Error: Circular dependency detected
Pipeline 'pipeline_a' is already in the execution chain
Circular path:
pipeline_a → pipeline_a
Indirect Circular Reference
# pipeline_a.yaml
- name: "call_b"
type: "pipeline"
pipeline_file: "./pipeline_b.yaml"
# pipeline_b.yaml
- name: "call_c"
type: "pipeline"
pipeline_file: "./pipeline_c.yaml"
# pipeline_c.yaml
- name: "call_a"
type: "pipeline"
pipeline_file: "./pipeline_a.yaml" # ERROR: Circular through chain
Error:
Error: Circular dependency detected in pipeline execution
Execution creates a cycle:
main → pipeline_a → pipeline_b → pipeline_c → pipeline_a
↑ ↓
└────────────────────────────────────────┘
Execution chain:
1. main (depth: 0)
2. pipeline_a (depth: 1) ← First occurrence
3. pipeline_b (depth: 2)
4. pipeline_c (depth: 3)
5. pipeline_a (depth: 4) ← Circular reference detected
Resolution:
- Review pipeline dependencies
- Break circular reference
- Consider extracting shared logic
Allowed Patterns
Self-reference is allowed with proper termination conditions:
# Recursive directory processor (allowed)
workflow:
name: "directory_scanner"
steps:
- name: "scan_current"
type: "claude"
prompt: "List items in: {{inputs.path}}"
- name: "process_subdirs"
type: "for_loop"
over: "{{steps.scan_current.subdirectories}}"
as: "subdir"
steps:
- name: "recurse"
type: "pipeline"
pipeline_ref: "directory_scanner" # Self-reference
condition: "{{context.depth < 5}}" # Termination condition
inputs:
path: "{{subdir.path}}"
Resource Management
Memory Monitoring
Real-time memory usage tracking with configurable limits:
Configuration
# Global configuration
config :pipeline,
default_memory_limit_mb: 1024, # 1GB default
memory_check_interval_ms: 1000, # Check every second
memory_warning_percentage: 75 # Warn at 75% usage
Per-Pipeline Limits
- name: "memory_intensive"
type: "pipeline"
pipeline_file: "./processor.yaml"
config:
memory_limit_mb: 2048 # 2GB limit
memory_warning_mb: 1536 # Warn at 1.5GB
memory_check_enabled: true
Memory Pressure Handling
The system monitors memory usage and provides warnings:
Warning: High memory usage detected
Pipeline: data_processor (depth: 2)
Current usage: 850MB / 1024MB (83%)
Parent pipeline: main_workflow
Memory breakdown:
- Context data: 450MB
- Step results: 300MB
- Workspace files: 100MB
Recommendations:
- Consider streaming large datasets
- Clear intermediate results after use
- Increase memory limit if needed
Automatic Cleanup
Configure automatic cleanup of large data:
config:
auto_cleanup:
enabled: true
threshold_mb: 100 # Clean results > 100MB
keep_final_result: true # Preserve final output
cleanup_on_success: true # Clean intermediate data
Resource Pooling
Efficient resource management through pooling:
# Resource pool configuration
config :pipeline,
resource_pools:
context_pool:
size: 50
overflow: 10
strategy: :lifo
workspace_pool:
size: 20
overflow: 5
cleanup_on_return: true
Memory Monitoring
Memory Usage Tracking
The ResourceMonitor module tracks memory usage across all nested pipelines:
# Memory tracking structure
%MemoryUsage{
current_mb: 456.7,
peak_mb: 512.3,
limit_mb: 1024,
breakdown: %{
contexts: 234.5,
results: 189.2,
workspaces: 33.0
},
history: [
{~U[2025-01-03 12:00:00Z], 123.4},
{~U[2025-01-03 12:00:01Z], 234.5},
# ...
]
}
Memory Limits and Actions
Define actions when memory limits are approached:
config:
memory_limits:
soft_limit_mb: 768 # Warning threshold
hard_limit_mb: 1024 # Execution stops
memory_actions:
at_50_percent:
- log_warning
at_75_percent:
- log_warning
- trigger_gc
- clear_caches
at_90_percent:
- log_error
- pause_execution
- await_cleanup
at_100_percent:
- terminate_execution
- cleanup_all
- return_error
Memory Optimization Strategies
# 1. Streaming Results
- name: "process_large_data"
type: "pipeline"
pipeline_file: "./stream_processor.yaml"
config:
stream_results: true
chunk_size_mb: 10
# 2. Result Pruning
- name: "analyze"
type: "pipeline"
pipeline_file: "./analyzer.yaml"
config:
prune_results:
keep_fields: ["summary", "score"]
remove_fields: ["raw_data", "intermediate"]
# 3. Garbage Collection
- name: "memory_intensive"
type: "pipeline"
pipeline_file: "./processor.yaml"
config:
gc_after_steps: ["large_transform", "aggregation"]
gc_threshold_mb: 500
Timeout Protection
Execution Time Limits
Prevent runaway pipelines with configurable timeouts:
Global Configuration
config :pipeline,
default_timeout_seconds: 300, # 5 minutes default
timeout_check_interval_ms: 1000, # Check every second
timeout_grace_period_seconds: 10 # Grace period for cleanup
Pipeline-Specific Timeouts
- name: "long_running_analysis"
type: "pipeline"
pipeline_file: "./analysis.yaml"
config:
timeout_seconds: 600 # 10 minutes
timeout_warning_seconds: 480 # Warn at 8 minutes
on_timeout: "graceful" # graceful | immediate | retry
Timeout Handling Strategies
Graceful Timeout
config:
timeout_seconds: 300
on_timeout: "graceful"
graceful_timeout:
save_checkpoint: true
cleanup_resources: true
return_partial_results: true
Immediate Timeout
config:
timeout_seconds: 300
on_timeout: "immediate"
immediate_timeout:
force_stop: true
skip_cleanup: false # Still attempt cleanup
Retry on Timeout
config:
timeout_seconds: 300
on_timeout: "retry"
retry_config:
max_attempts: 3
backoff_factor: 2
increase_timeout: true # Double timeout on retry
Timeout Error Messages
Error: Pipeline execution timeout
Details:
Pipeline: complex_analysis (depth: 2)
Elapsed time: 301.5 seconds
Timeout limit: 300 seconds
Execution progress:
- Completed steps: 45/60 (75%)
- Current step: deep_learning_analysis
- Step duration: 125.3 seconds
Partial results saved: true
Checkpoint available: true
Recovery options:
1. Resume from checkpoint with --resume flag
2. Increase timeout in configuration
3. Optimize slow steps identified above
Step Count Limits
Preventing Exponential Growth
Limit total steps across all nested pipelines:
config :pipeline,
max_total_steps: 1000, # Total across all pipelines
step_warning_threshold: 750, # Warn at 75%
count_nested_steps: true # Include nested pipeline steps
Step Counting Logic
# Steps are counted across all nesting levels
def count_total_steps(pipeline, context) do
direct_steps = length(pipeline.steps)
nested_steps = Enum.sum(
for step <- pipeline.steps,
step.type == "pipeline" do
count_nested_pipeline_steps(step, context)
end
)
direct_steps + nested_steps
end
Step Count Monitoring
# Monitor step explosion in complex pipelines
- name: "analyze_all_files"
type: "for_loop"
over: "{{file_list}}" # 100 files
steps:
- name: "process_file"
type: "pipeline"
pipeline_file: "./file_processor.yaml" # 20 steps each
config:
max_total_steps: 2500 # Override for this use case
Warning output:
Warning: Approaching step count limit
Current steps: 1,750 / 2,000 (87.5%)
Step breakdown by pipeline:
- main_workflow: 5 steps
- file_processor (x87): 1,740 steps
- cleanup_pipeline: 5 steps
Largest contributors:
1. file_processor.deep_analysis: 522 steps (30%)
2. file_processor.validation: 348 steps (20%)
3. file_processor.transformation: 261 steps (15%)
Workspace Isolation
Isolated Execution Environments
Each nested pipeline gets its own workspace:
config:
workspace_isolation:
enabled: true
base_dir: "./workspaces"
naming_pattern: "${pipeline_id}_${timestamp}"
permissions: "0750"
Workspace Structure
workspaces/
├── main_workflow_1704288000/
│ ├── .metadata.json
│ ├── inputs/
│ ├── outputs/
│ └── temp/
├── nested_processor_1704288030/
│ ├── .metadata.json
│ ├── inputs/
│ ├── outputs/
│ └── temp/
Workspace Lifecycle
# Workspace configuration
- name: "isolated_execution"
type: "pipeline"
pipeline_file: "./processor.yaml"
config:
workspace:
create_on_start: true
cleanup_on_success: true
cleanup_on_error: false # Keep for debugging
preserve_outputs: true # Copy outputs before cleanup
max_size_mb: 500 # Limit workspace size
Workspace Security
# Security measures for workspace isolation
config :pipeline,
workspace_security:
enforce_isolation: true,
prevent_traversal: true,
sandbox_commands: true,
allowed_paths: [
"./workspaces/",
"./shared/resources/"
],
blocked_paths: [
"/etc/",
"/usr/",
"~/"
]
Configuration Management
Environment-Specific Configuration
# config/dev.exs
config :pipeline, :safety,
recursion:
max_depth: 15,
warning_depth: 10
memory:
limit_mb: 2048,
warning_percent: 70
timeout:
default_seconds: 600,
warning_seconds: 480
development_mode: true,
relaxed_limits: true
# config/prod.exs
config :pipeline, :safety,
recursion:
max_depth: 8,
warning_depth: 5
memory:
limit_mb: 1024,
warning_percent: 75
timeout:
default_seconds: 300,
warning_seconds: 240
production_mode: true,
strict_enforcement: true
Runtime Configuration
Override safety settings at runtime:
# Via environment variables
PIPELINE_MAX_DEPTH=10 mix pipeline.run workflow.yaml
# Via CLI flags
mix pipeline.run workflow.yaml \
--max-depth 10 \
--memory-limit 2048 \
--timeout 600
Configuration Validation
The system validates safety configurations:
# Invalid configuration detected
Error: Invalid safety configuration
Issues found:
- max_depth (3) is less than minimum (5)
- memory_limit (100MB) is below minimum (256MB)
- timeout (30s) is below minimum (60s)
Please adjust configuration to meet minimum safety requirements
Monitoring and Alerts
Telemetry Integration
# Attach to safety-related telemetry events
:telemetry.attach_many(
"pipeline-safety-monitoring",
[
[:pipeline, :safety, :limit_warning],
[:pipeline, :safety, :limit_exceeded],
[:pipeline, :safety, :resource_pressure],
[:pipeline, :safety, :check_performed]
],
&handle_safety_event/4,
nil
)
Safety Metrics
Export safety metrics for monitoring:
# Prometheus metrics
pipeline_safety_checks_total{type="recursion", result="pass"} 1523
pipeline_safety_checks_total{type="recursion", result="fail"} 12
pipeline_safety_limit_warnings{type="memory", level="75_percent"} 45
pipeline_safety_limit_exceeded{type="timeout"} 3
pipeline_max_depth_reached{pipeline="analyzer"} 7
pipeline_memory_usage_bytes{pipeline="processor", percentile="95"} 892739584
Alert Configuration
# Alert when safety limits are approached
alerts:
- name: "high_memory_usage"
condition: "memory_usage_percent > 80"
severity: "warning"
actions:
- log_warning
- send_notification
- name: "depth_limit_approached"
condition: "current_depth > max_depth * 0.8"
severity: "warning"
actions:
- log_warning
- add_metric
- name: "timeout_imminent"
condition: "elapsed_time > timeout * 0.9"
severity: "critical"
actions:
- log_error
- trigger_graceful_shutdown
- notify_operators
Best Practices
1. Configure Appropriate Limits
Set limits based on your use case:
# Data processing pipeline - higher memory, longer timeout
config:
memory_limit_mb: 4096
timeout_seconds: 1800 # 30 minutes
max_depth: 5 # Keep shallow
# Quick validation pipeline - lower limits
config:
memory_limit_mb: 512
timeout_seconds: 60 # 1 minute
max_depth: 3 # Very shallow
2. Design for Safety
Structure pipelines with safety in mind:
# Good: Flat structure with sequential processing
workflow:
steps:
- name: "validate"
type: "pipeline"
pipeline_file: "./validate.yaml"
- name: "process"
type: "pipeline"
pipeline_file: "./process.yaml"
- name: "report"
type: "pipeline"
pipeline_file: "./report.yaml"
# Avoid: Deep nesting without clear need
workflow:
steps:
- name: "level1"
type: "pipeline"
pipeline:
steps:
- name: "level2"
type: "pipeline"
pipeline:
steps:
- name: "level3"
type: "pipeline"
# ... continues deeper
3. Monitor Resource Usage
Add monitoring steps:
- name: "check_resources"
type: "claude"
prompt: |
Check current resource usage:
- Memory: {{system.memory_usage_mb}}MB / {{config.memory_limit_mb}}MB
- Steps: {{system.total_steps}} / {{config.max_total_steps}}
- Depth: {{context.depth}} / {{config.max_depth}}
- Time: {{system.elapsed_seconds}}s / {{config.timeout_seconds}}s
4. Implement Graceful Degradation
# Primary processing with fallback
- name: "primary_analysis"
type: "pipeline"
pipeline_file: "./full_analysis.yaml"
config:
memory_limit_mb: 1024
continue_on_error: true
- name: "fallback_analysis"
type: "pipeline"
pipeline_file: "./lightweight_analysis.yaml"
condition: "{{steps.primary_analysis.error == 'memory_limit_exceeded'}}"
config:
memory_limit_mb: 256 # Much lower requirement
5. Test Safety Limits
Include safety limit tests:
# test_safety_limits.yaml
tests:
- name: "test_max_depth"
pipeline: "./deep_pipeline.yaml"
expect_error: "max_depth_exceeded"
- name: "test_memory_limit"
pipeline: "./memory_intensive.yaml"
config:
memory_limit_mb: 100 # Artificially low
expect_error: "memory_limit_exceeded"
- name: "test_timeout"
pipeline: "./slow_pipeline.yaml"
config:
timeout_seconds: 1
expect_error: "timeout_exceeded"
Troubleshooting
Common Safety Issues
1. Unexpected Depth Limit Errors
Problem: Pipeline fails with depth limit when not expected
Diagnosis:
- name: "debug_depth"
type: "claude"
prompt: |
Current execution chain:
{{json(context.execution_chain)}}
Current depth: {{context.depth}}
Max depth: {{config.max_depth}}
Solutions:
- Review execution chain for unexpected nesting
- Check for accidental recursive calls
- Increase depth limit if legitimate
2. Memory Pressure Issues
Problem: Frequent memory warnings or failures
Diagnosis:
# Enable detailed memory profiling
config :pipeline,
memory_profiling:
enabled: true
sample_interval_ms: 100
track_allocations: true
Solutions:
- Identify large data structures
- Implement streaming for large datasets
- Clear intermediate results
- Increase memory limits
3. Timeout Issues
Problem: Pipelines timing out unexpectedly
Diagnosis:
# Add timing instrumentation
- name: "measure_step_time"
type: "claude"
prompt: "Starting expensive operation"
metadata:
timer_start: "expensive_op"
- name: "expensive_operation"
type: "pipeline"
pipeline_file: "./processor.yaml"
- name: "report_time"
type: "claude"
prompt: |
Operation took: {{timer.expensive_op.elapsed_ms}}ms
Solutions:
- Profile slow steps
- Implement caching
- Parallelize where possible
- Increase timeout appropriately
Debug Mode
Enable comprehensive safety debugging:
# Run with safety debug mode
PIPELINE_SAFETY_DEBUG=true mix pipeline.run workflow.yaml
# Debug output includes:
# - All safety checks performed
# - Resource usage at each step
# - Limit approaching warnings
# - Configuration validation
# - Execution chain visualization
API Reference
Safety Manager API
# Main safety check interface
Pipeline.Safety.SafetyManager.check_safety(pipeline_id, context, config)
# Returns: :ok | {:error, reason}
# Individual safety checks
Pipeline.Safety.RecursionGuard.check_depth(context, max_depth)
Pipeline.Safety.RecursionGuard.check_circular_dependency(pipeline_id, context)
Pipeline.Safety.ResourceMonitor.check_memory(current_usage, limit)
Pipeline.Safety.ResourceMonitor.check_timeout(start_time, timeout)
Pipeline.Safety.ResourceMonitor.check_step_count(count, limit)
# Resource tracking
Pipeline.Safety.ResourceMonitor.start_monitoring(context)
Pipeline.Safety.ResourceMonitor.stop_monitoring(context)
Pipeline.Safety.ResourceMonitor.get_current_usage(context)
# Configuration
Pipeline.Safety.Config.get_limits(context)
Pipeline.Safety.Config.validate_config(config)
Configuration Schema
# Safety configuration structure
%{
recursion: %{
max_depth: integer(),
warning_depth: integer(),
track_chain: boolean()
},
memory: %{
limit_mb: integer(),
warning_percent: integer(),
check_interval_ms: integer(),
auto_cleanup: boolean()
},
timeout: %{
default_seconds: integer(),
warning_seconds: integer(),
grace_period_seconds: integer(),
on_timeout: :graceful | :immediate | :retry
},
steps: %{
max_total: integer(),
warning_threshold: integer(),
count_nested: boolean()
},
workspace: %{
isolation_enabled: boolean(),
cleanup_on_success: boolean(),
cleanup_on_error: boolean(),
max_size_mb: integer()
}
}
Telemetry Events
# Safety-related telemetry events
[:pipeline, :safety, :check_started]
# Metadata: %{check_type: atom(), pipeline_id: string(), context: map()}
[:pipeline, :safety, :check_completed]
# Metadata: %{check_type: atom(), result: :ok | :error, duration_μs: integer()}
[:pipeline, :safety, :limit_warning]
# Metadata: %{limit_type: atom(), current: number(), limit: number(), percent: float()}
[:pipeline, :safety, :limit_exceeded]
# Metadata: %{limit_type: atom(), current: number(), limit: number(), pipeline_id: string()}
[:pipeline, :safety, :resource_cleaned]
# Metadata: %{resource_type: atom(), size_bytes: integer(), pipeline_id: string()}
This guide provides comprehensive documentation for the safety features in recursive pipelines. These mechanisms ensure stable, predictable execution while providing clear feedback when limits are approached or exceeded.