Advanced Features Reference
View SourceTable of Contents
- Overview
- Data Transformation
- Codebase Intelligence
- File Operations
- Schema Validation
- Function Calling
- Session Management
- Content Processing
- Performance Features
- Integration Patterns
Overview
Pipeline's advanced features enable sophisticated AI engineering workflows through:
- Data transformation with JSONPath and complex operations
- Codebase intelligence for language-aware analysis
- File operations with format conversion
- Schema validation for structured outputs
- Function calling for tool integration
- Session management for stateful workflows
- Content processing for extraction and summarization
- Performance optimization for large-scale processing
Data Transformation
Transform Step Type
The data_transform
step provides powerful data manipulation:
- name: "process_analysis_results"
type: "data_transform"
input_source: "steps.analysis.results"
operations:
# Filter items by condition
- operation: "filter"
field: "vulnerabilities"
condition: "severity == 'critical' or severity == 'high'"
# Transform each item
- operation: "map"
field: "vulnerabilities"
expression: |
{
"id": item.cve_id,
"risk_score": item.cvss_score * 10,
"priority": item.cvss_score > 7 ? "urgent" : "normal",
"description": substring(item.description, 0, 200)
}
# Group by category
- operation: "group_by"
field: "vulnerabilities"
key: "category"
# Aggregate statistics
- operation: "aggregate"
field: "vulnerabilities"
function: "count"
group_by: "severity"
as: "severity_counts"
# Sort results
- operation: "sort"
field: "vulnerabilities"
by: "risk_score"
order: "desc"
output_field: "processed_vulnerabilities"
output_to_file: "vulnerability_report.json"
Available Operations
Filter
Select items matching conditions:
- operation: "filter"
field: "items"
condition: "price > 100 and category == 'electronics'"
Map
Transform each item:
- operation: "map"
field: "users"
expression: |
{
"full_name": item.first_name + " " + item.last_name,
"age_group": item.age < 18 ? "minor" : "adult",
"account_type": item.premium ? "premium" : "basic"
}
Aggregate
Calculate statistics:
- operation: "aggregate"
field: "transactions"
function: "sum" # sum, average, min, max, count
value_field: "amount" # Field to aggregate
group_by: "category" # Optional grouping
Join
Combine datasets:
- operation: "join"
left_field: "orders"
right_source: "steps.fetch.customers"
join_key: "customer_id"
join_type: "left" # left, right, inner, outer
as: "enriched_orders"
Group By
Group items by key:
- operation: "group_by"
field: "events"
key: "event_type"
aggregate:
count: "count"
total_duration: "sum(duration)"
avg_duration: "average(duration)"
Sort
Order items:
- operation: "sort"
field: "products"
by: "price" # or complex: "category,price"
order: "asc" # asc or desc
JSONPath Expressions
Access nested data with JSONPath:
operations:
# Access nested fields
- operation: "extract"
expression: "$.data.users[*].profile.email"
as: "email_list"
# Filter with JSONPath
- operation: "extract"
expression: "$.items[?(@.price > 100)].name"
as: "expensive_items"
# Complex queries
- operation: "extract"
expression: "$.orders[?(@.status == 'pending')].items[*].total"
as: "pending_totals"
Complex Transformations
Chain multiple operations:
- name: "analyze_test_results"
type: "data_transform"
input_source: "steps.test_run.results"
operations:
# Extract test cases
- operation: "extract"
expression: "$.suites[*].tests[*]"
as: "all_tests"
# Filter failed tests
- operation: "filter"
field: "all_tests"
condition: "status == 'failed'"
as: "failed_tests"
# Group by suite
- operation: "group_by"
field: "failed_tests"
key: "suite_name"
as: "failures_by_suite"
# Calculate statistics
- operation: "map"
field: "failures_by_suite"
expression: |
{
"suite": key,
"failure_count": length(value),
"failure_rate": length(value) / suite_total_tests * 100,
"critical_failures": filter(value, "priority == 'critical'")
}
# Sort by failure rate
- operation: "sort"
field: "failures_by_suite"
by: "failure_rate"
order: "desc"
output_field: "test_analysis"
Codebase Intelligence
Codebase Query Step
Intelligent code analysis and discovery:
- name: "analyze_codebase"
type: "codebase_query"
codebase_context: true # Include project metadata
queries:
# Project information
project_info:
get_project_type: true
get_dependencies: true
get_git_status: true
get_recent_commits: 10
# Find specific files
source_files:
find_files:
- type: "source"
- pattern: "**/*.py"
- exclude_patterns: ["**/test_*", "**/__pycache__/**"]
- modified_since: "2024-01-01"
- min_size: 100
# Find related test files
test_coverage:
find_files:
- related_to: "src/auth.py"
- type: "test"
- pattern: "**/test_*.py"
# Extract code structure
api_endpoints:
find_functions:
- in_files: "src/api/**/*.py"
- with_decorator: "@app.route"
- extract_metadata: true
# Analyze imports
dependencies:
find_imports:
- in_files: "src/**/*.py"
- external_only: true
- group_by_package: true
# Find usage
auth_usage:
find_references:
- to_function: "authenticate_user"
- in_files: "src/**/*.py"
- include_line_numbers: true
output_to_file: "codebase_analysis.json"
Language-Specific Analysis
Support for multiple languages:
queries:
# Python-specific
python_classes:
find_classes:
- in_files: "**/*.py"
- with_base_class: "BaseModel"
- include_methods: true
# JavaScript/TypeScript
react_components:
find_exports:
- in_files: "**/*.{jsx,tsx}"
- type: "react_component"
- with_props: true
# Go
go_interfaces:
find_interfaces:
- in_files: "**/*.go"
- exported_only: true
# Elixir
elixir_modules:
find_modules:
- in_files: "lib/**/*.ex"
- with_behaviour: "GenServer"
Dependency Analysis
Track code dependencies:
queries:
# Direct dependencies
module_deps:
find_dependencies:
- for_file: "lib/user.ex"
- include_stdlib: false
- max_depth: 1
# Reverse dependencies
module_usage:
find_dependents:
- of_file: "lib/auth.ex"
- include_tests: true
- group_by_directory: true
# Circular dependencies
circular_deps:
find_circular_dependencies:
- in_directory: "lib/"
- max_cycle_length: 5
Git Integration
Analyze version control data:
queries:
git_info:
# Recent changes
get_changed_files:
- since_commit: "HEAD~10"
- include_stats: true
# Author statistics
get_contributors:
- for_files: "src/core/**"
- include_commit_count: true
# Hot spots
get_change_frequency:
- time_period: "3 months"
- minimum_changes: 5
File Operations
File Operations Step
Comprehensive file manipulation:
- name: "organize_outputs"
type: "file_ops"
operation: "copy"
source:
- pattern: "workspace/**/*.py"
- exclude: ["**/test_*.py", "**/__pycache__/**"]
destination: "output/python_files/"
options:
preserve_structure: true
overwrite: true
Supported Operations
Copy Files
- name: "backup_sources"
type: "file_ops"
operation: "copy"
source: ["src/", "tests/"]
destination: "backup/{{timestamp}}/"
options:
recursive: true
preserve_timestamps: true
Move Files
- name: "reorganize"
type: "file_ops"
operation: "move"
source:
pattern: "temp/*.processed"
destination: "completed/"
options:
create_destination: true
Delete Files
- name: "cleanup"
type: "file_ops"
operation: "delete"
files:
- pattern: "**/*.tmp"
- pattern: "**/*.cache"
- older_than: "7 days"
options:
dry_run: false
Validate Files
- name: "verify_outputs"
type: "file_ops"
operation: "validate"
files:
- path: "output/report.pdf"
must_exist: true
min_size: 1024
max_size: 10485760 # 10MB
- path: "output/data/"
must_be_dir: true
must_contain: ["summary.json", "details.csv"]
- pattern: "output/**/*.json"
validate_json: true
List Files
- name: "scan_directory"
type: "file_ops"
operation: "list"
path: "./workspace"
options:
recursive: true
include_hidden: false
pattern: "**/*.{py,js,ts}"
sort_by: "modified"
limit: 100
output_field: "file_list"
Format Conversion
- name: "convert_data"
type: "file_ops"
operation: "convert"
source: "data.csv"
destination: "data.json"
format: "csv_to_json"
options:
csv_delimiter: ","
csv_headers: true
json_indent: 2
Batch Operations
Process multiple files:
- name: "batch_convert"
type: "for_loop"
iterator: "file"
data_source: "steps.scan.csv_files"
steps:
- name: "convert_file"
type: "file_ops"
operation: "convert"
source: "{{loop.file.path}}"
destination: "{{replace(loop.file.path, '.csv', '.json')}}"
format: "csv_to_json"
Schema Validation
Output Schema Validation
Enforce structured outputs:
- name: "structured_analysis"
type: "gemini"
output_schema:
type: "object"
required: ["summary", "findings", "recommendations"]
properties:
summary:
type: "string"
minLength: 50
maxLength: 500
findings:
type: "array"
minItems: 1
items:
type: "object"
required: ["type", "severity", "description"]
properties:
type:
type: "string"
enum: ["bug", "vulnerability", "improvement"]
severity:
type: "string"
enum: ["low", "medium", "high", "critical"]
description:
type: "string"
line_number:
type: "integer"
minimum: 1
recommendations:
type: "array"
items:
type: "object"
properties:
priority:
type: "integer"
minimum: 1
maximum: 5
action:
type: "string"
effort:
type: "string"
enum: ["trivial", "small", "medium", "large"]
metadata:
type: "object"
properties:
timestamp:
type: "string"
format: "date-time"
version:
type: "string"
pattern: "^\\d+\\.\\d+\\.\\d+$"
prompt:
- type: "static"
content: "Analyze code and return structured findings"
Schema Features
Data Types:
string
,number
,integer
,boolean
object
,array
null
(for optional fields)
String Constraints:
properties:
email:
type: "string"
format: "email"
pattern: "^[\\w.-]+@[\\w.-]+\\.\\w+$"
code:
type: "string"
minLength: 6
maxLength: 6
pattern: "^[0-9]{6}$"
Numeric Constraints:
properties:
score:
type: "number"
minimum: 0
maximum: 100
multipleOf: 0.5
count:
type: "integer"
minimum: 0
exclusiveMaximum: 1000
Array Constraints:
properties:
tags:
type: "array"
minItems: 1
maxItems: 10
uniqueItems: true
items:
type: "string"
enum: ["bug", "feature", "docs", "test"]
Complex Schemas:
output_schema:
type: "object"
properties:
result:
oneOf:
- type: "object"
properties:
success:
const: true
data:
type: "object"
- type: "object"
properties:
success:
const: false
error:
type: "string"
# Conditional schemas
if:
properties:
type:
const: "user"
then:
required: ["email", "username"]
else:
required: ["id"]
Function Calling
Gemini Functions
Define and use functions with Gemini:
workflow:
gemini_functions:
analyze_code_quality:
description: "Analyze code quality metrics"
parameters:
type: "object"
required: ["file_path", "metrics"]
properties:
file_path:
type: "string"
description: "Path to the file to analyze"
metrics:
type: "array"
items:
type: "string"
enum: ["complexity", "maintainability", "coverage"]
include_suggestions:
type: "boolean"
default: true
generate_test_cases:
description: "Generate test cases for a function"
parameters:
type: "object"
required: ["function_name", "function_code"]
properties:
function_name:
type: "string"
function_code:
type: "string"
test_types:
type: "array"
items:
type: "string"
enum: ["unit", "integration", "edge_case"]
coverage_target:
type: "number"
minimum: 0
maximum: 100
steps:
- name: "code_analysis"
type: "gemini"
functions:
- "analyze_code_quality"
- "generate_test_cases"
prompt:
- type: "static"
content: |
Analyze this code file and:
1. Call analyze_code_quality for quality metrics
2. Call generate_test_cases for untested functions
- type: "file"
path: "src/main.py"
Function Results
Process function call results:
- name: "process_analysis"
type: "gemini"
prompt:
- type: "static"
content: "Summarize the analysis results:"
- type: "previous_response"
step: "code_analysis"
extract: "function_calls"
Session Management
Claude Sessions
Maintain conversation state:
- name: "development_session"
type: "claude_session"
session_config:
session_name: "feature_development"
persist: true
continue_on_restart: true
checkpoint_frequency: 10
max_turns: 100
description: "Developing authentication feature"
claude_options:
max_turns: 20
allowed_tools: ["Write", "Edit", "Read", "Bash"]
prompt:
- type: "static"
content: "Let's continue developing the auth feature"
- type: "session_context"
session_id: "feature_development"
include_last_n: 5
Session Continuation
Continue sessions across steps:
steps:
- name: "start_session"
type: "claude_session"
session_config:
session_name: "refactoring_session"
persist: true
prompt:
- type: "static"
content: "Begin refactoring the user module"
- name: "continue_work"
type: "claude_session"
session_config:
session_name: "refactoring_session"
continue_session: true
prompt:
- type: "claude_continue"
new_prompt: "Now add error handling to all functions"
- name: "final_review"
type: "claude_session"
session_config:
session_name: "refactoring_session"
continue_session: true
prompt:
- type: "claude_continue"
new_prompt: "Review all changes and create a summary"
Content Processing
Content Extraction
Advanced extraction with ContentExtractor:
- name: "extract_insights"
type: "claude_extract"
preset: "analysis"
extraction_config:
use_content_extractor: true
format: "structured"
post_processing:
- "extract_code_blocks"
- "extract_recommendations"
- "extract_errors"
- "extract_metrics"
- "extract_dependencies"
code_block_processing:
identify_language: true
syntax_highlight: true
extract_imports: true
recommendation_format:
include_priority: true
include_effort: true
group_by_category: true
max_summary_length: 1000
include_metadata: true
prompt:
- type: "file"
path: "analysis_report.md"
Extraction Formats
Structured Format:
extraction_config:
format: "structured"
sections:
- name: "overview"
max_length: 500
- name: "findings"
item_limit: 20
- name: "recommendations"
priority_only: true
Summary Format:
extraction_config:
format: "summary"
summary_style: "bullet_points"
max_points: 10
include_key_metrics: true
Markdown Format:
extraction_config:
format: "markdown"
heading_level: 2
include_toc: true
code_fence_style: "```"
Performance Features
Streaming Operations
Process large datasets efficiently:
- name: "stream_process"
type: "data_transform"
input_source: "large_dataset.jsonl"
stream_mode: true
chunk_size: 1000
operations:
- operation: "filter"
condition: "record.active == true"
- operation: "map"
expression: |
{
"id": record.id,
"processed_at": now(),
"summary": substring(record.description, 0, 100)
}
output_file: "processed_data.jsonl"
output_format: "jsonl"
Parallel Processing
Execute operations concurrently:
- name: "parallel_analysis"
type: "parallel_claude"
parallel_config:
max_workers: 5
queue_size: 20
timeout_per_task: 300
retry_failed: true
task_generator:
type: "file_list"
pattern: "**/*.py"
batch_size: 10
task_template:
claude_options:
max_turns: 5
allowed_tools: ["Read"]
prompt:
- type: "static"
content: "Analyze this batch of files:"
- type: "dynamic"
content: "{{task.files}}"
Caching
Optimize repeated operations:
workflow:
cache_config:
enabled: true
ttl: 3600
max_size_mb: 100
cache_keys:
- "file_content"
- "analysis_results"
- "transformations"
steps:
- name: "cached_analysis"
type: "gemini"
cache:
key: "analysis_{{hash(file_path)}}"
ttl: 7200
prompt:
- type: "file"
path: "{{file_path}}"
Integration Patterns
Multi-Stage Processing
Complex workflows with multiple stages:
workflow:
name: "complete_analysis_pipeline"
steps:
# Stage 1: Discovery
- name: "discover"
type: "codebase_query"
codebase_context: true
queries:
project_structure:
get_project_type: true
find_entry_points: true
# Stage 2: Analysis
- name: "analyze_each_component"
type: "for_loop"
iterator: "component"
data_source: "steps.discover.project_structure.components"
parallel: true
steps:
- name: "component_analysis"
type: "pipeline"
pipeline_file: "./analyze_component.yaml"
inputs:
component: "{{loop.component}}"
# Stage 3: Transform Results
- name: "consolidate"
type: "data_transform"
input_source: "steps.analyze_each_component.results"
operations:
- operation: "flatten"
field: "analyses"
- operation: "group_by"
key: "severity"
- operation: "sort"
by: "priority"
order: "desc"
# Stage 4: Generate Report
- name: "report"
type: "claude_extract"
extraction_config:
format: "markdown"
include_metadata: true
prompt:
- type: "static"
content: "Generate comprehensive report:"
- type: "previous_response"
step: "consolidate"
Event-Driven Patterns
React to conditions and events:
workflow:
name: "monitoring_pipeline"
steps:
- name: "monitor_loop"
type: "while_loop"
condition: "state.monitoring_active"
max_iterations: 1000
steps:
- name: "check_conditions"
type: "codebase_query"
queries:
changes:
get_changed_files:
since: "{{state.last_check}}"
- name: "process_if_changed"
type: "pipeline"
condition: "length(steps.check_conditions.changes) > 0"
pipeline_file: "./process_changes.yaml"
inputs:
changes: "{{steps.check_conditions.changes}}"
- name: "update_state"
type: "set_variable"
variables:
last_check: "{{now()}}"
- name: "wait"
type: "file_ops"
operation: "wait"
duration_seconds: 60
Adaptive Workflows
Workflows that adapt based on runtime conditions:
workflow:
name: "adaptive_processor"
steps:
- name: "analyze_workload"
type: "gemini"
prompt:
- type: "static"
content: "Analyze workload characteristics"
- type: "file"
path: "workload_metrics.json"
- name: "select_strategy"
type: "switch"
expression: "steps.analyze_workload.recommended_strategy"
cases:
"batch":
- name: "batch_process"
type: "pipeline"
pipeline_file: "./strategies/batch_processor.yaml"
"stream":
- name: "stream_process"
type: "pipeline"
pipeline_file: "./strategies/stream_processor.yaml"
"parallel":
- name: "parallel_process"
type: "pipeline"
pipeline_file: "./strategies/parallel_processor.yaml"
default:
- name: "standard_process"
type: "pipeline"
pipeline_file: "./strategies/standard_processor.yaml"
This reference provides comprehensive documentation for advanced features in Pipeline YAML v2 format, enabling sophisticated AI engineering workflows.