Pipeline.Step.DataTransform (pipeline v0.0.1)
View SourceData transformation step executor for manipulating structured data between pipeline steps.
This step type provides powerful data transformation capabilities including:
- Filtering data based on conditions
- Mapping and transforming fields
- Aggregating values with functions like sum, average, count
- Joining data from multiple sources
- Grouping data by field values
- Sorting data by field values
Uses JSONPath-like syntax for field access and supports chaining multiple operations.
Configuration
input_source
(required): Source of input data (e.g., "previous_response:step_name")operations
(required): List of transformation operations to applyoutput_field
(optional): Field name to store result in context (defaults to step name)
Example YAML Configuration
- name: "process_results"
type: "data_transform"
input_source: "previous_response:analysis"
operations:
- operation: "filter"
field: "recommendations"
condition: "priority == 'high'"
- operation: "aggregate"
field: "scores"
function: "average"
- operation: "join"
left_field: "files"
right_source: "previous_response:file_metadata"
join_key: "filename"
output_field: "processed_data"
Supported Operations
Filter
Filter items based on conditions.
- operation: "filter"
field: "status"
condition: "status == 'active'"
Map
Transform fields with mappings or expressions.
- operation: "map"
field: "priority"
mapping:
"1": "high"
"2": "medium"
"3": "low"
Aggregate
Aggregate values using functions.
- operation: "aggregate"
field: "scores"
function: "average" # sum, average, count, max, min, first, last, unique
Join
Join data from another source.
- operation: "join"
left_field: "user_id"
right_source: "previous_response:users"
join_key: "id"
Group By
Group items by field values.
- operation: "group_by"
field: "category"
Sort
Sort items by field values.
- operation: "sort"
field: "created_at"
order: "desc" # asc or desc
Summary
Functions
Execute a data transformation step.