Pipeline.Step.DataTransform (pipeline v0.0.1)

View Source

Data transformation step executor for manipulating structured data between pipeline steps.

This step type provides powerful data transformation capabilities including:

  • Filtering data based on conditions
  • Mapping and transforming fields
  • Aggregating values with functions like sum, average, count
  • Joining data from multiple sources
  • Grouping data by field values
  • Sorting data by field values

Uses JSONPath-like syntax for field access and supports chaining multiple operations.

Configuration

  • input_source (required): Source of input data (e.g., "previous_response:step_name")
  • operations (required): List of transformation operations to apply
  • output_field (optional): Field name to store result in context (defaults to step name)

Example YAML Configuration

- name: "process_results"
  type: "data_transform"
  input_source: "previous_response:analysis"
  operations:
    - operation: "filter"
      field: "recommendations"
      condition: "priority == 'high'"
    - operation: "aggregate"
      field: "scores"
      function: "average"
    - operation: "join"
      left_field: "files"
      right_source: "previous_response:file_metadata"
      join_key: "filename"
  output_field: "processed_data"

Supported Operations

Filter

Filter items based on conditions.

- operation: "filter"
  field: "status"
  condition: "status == 'active'"

Map

Transform fields with mappings or expressions.

- operation: "map"
  field: "priority"
  mapping:
    "1": "high"
    "2": "medium"
    "3": "low"

Aggregate

Aggregate values using functions.

- operation: "aggregate"
  field: "scores"
  function: "average"  # sum, average, count, max, min, first, last, unique

Join

Join data from another source.

- operation: "join"
  left_field: "user_id"
  right_source: "previous_response:users"
  join_key: "id"

Group By

Group items by field values.

- operation: "group_by"
  field: "category"

Sort

Sort items by field values.

- operation: "sort"
  field: "created_at"
  order: "desc"  # asc or desc

Summary

Functions

Execute a data transformation step.

Functions

execute(step, context)

Execute a data transformation step.