View Source Simple Data Pipeline Example

introduction

Introduction

This example demonstrates using Handoff to build a simple data processing pipeline. The pipeline processes random data through a series of transformation steps, showing how to construct and execute a directed acyclic graph (DAG) of operations.

setup

Setup

First, let's make sure the Handoff library is available:

Mix.install([{:handoff, "~> 0.1"}])

statistics-helper-module

Statistics Helper Module

defmodule Stats do
  @moduledoc """
  Simple statistics helpers for the pipeline example
  """

  def mean(values) when is_list(values) and length(values) > 0 do
    Enum.sum(values) / length(values)
  end

  def median(values) when is_list(values) and length(values) > 0 do
    sorted = Enum.sort(values)
    mid = div(length(sorted), 2)

    if rem(length(sorted), 2) == 0 do
      (Enum.at(sorted, mid - 1) + Enum.at(sorted, mid)) / 2
    else
      Enum.at(sorted, mid)
    end
  end

  def standard_deviation(values) when is_list(values) and length(values) > 1 do
    avg = mean(values)
    variance = Enum.map(values, fn x -> :math.pow(x - avg, 2) end) |> mean()
    :math.sqrt(variance)
  end
end

pipeline-tasks-module

Pipeline Tasks Module

defmodule PipelineTasks do
  @moduledoc """
  Contains the core logic for each step in the simple data pipeline.
  Functions are designed to be called by Handoff.
  """

  def generate_data_task() do
    IO.puts("Generating 100 random values...")
    for _ <- 1..100, do: :rand.uniform() * 100
  end

  def filter_data_task(data, threshold) do
    IO.puts("Filtering values > #{threshold}...")
    Enum.filter(data, fn x -> x > threshold end)
  end

  def transform_data_task(data, operation) do
    IO.puts("Applying #{operation} transformation...")
    case operation do
      :square -> Enum.map(data, fn x -> x * x end)
      :sqrt -> Enum.map(data, fn x -> :math.sqrt(x) end)
      :log -> Enum.map(data, fn x -> :math.log(x) end)
      _ -> data # Default: no transformation
    end
  end

  def aggregate_data_task(data) do
    IO.puts("Calculating statistics...")
    %{
      count: length(data),
      min: Enum.min(data),
      max: Enum.max(data),
      mean: Stats.mean(data),
      median: Stats.median(data),
      stddev: Stats.standard_deviation(data)
    }
  end

  def format_output_task(stats, original_data) do
    IO.puts("Formatting results...")
    """

    DATA PIPELINE RESULTS
    ---------------------
    Original filtered data count: #{length(original_data)}

    STATISTICS:
      Count:  #{stats.count}
      Min:    #{Float.round(stats.min, 2)}
      Max:    #{Float.round(stats.max, 2)}
      Mean:   #{Float.round(stats.mean, 2)}
      Median: #{Float.round(stats.median, 2)}
      StdDev: #{Float.round(stats.stddev, 2)}
    """
  end
end

starting-handoff

Starting Handoff

# Ensure Handoff application is started
# In a real application, this is usually in your application.ex.
# For scripts/Livebooks, start it manually if not already running.
if GenServer.whereis(Handoff.DistributedExecutor) == nil do
  {:ok, _pid} = Handoff.start_link() # Or Handoff.Application.start(:normal, [])
  IO.puts "Handoff Application started."
else
  IO.puts "Handoff Application already running."
end

# Register the local node with some default capabilities for local execution
# This allows DistributedExecutor to find and use the current node.
Handoff.DistributedExecutor.register_local_node(%{cpu: 2, memory: 1024, gpu: 0})
IO.puts "Local node registered with Handoff."

# Create a new DAG
dag = Handoff.DAG.new("simple_pipeline_dag")

building-the-pipeline

Building the Pipeline

Let's define each step in our data pipeline:

alias Handoff.Function

# Step 1: Generate random data
generate_fn = %Function{
  id: :generate_data,
  args: [], # No dependencies
  code: &PipelineTasks.generate_data_task/0
}

# Step 2: Filter data
filter_fn = %Function{
  id: :filter_data,
  args: [:generate_data], # Depends on generate_data, result passed as first arg to filter_data_task
  code: &PipelineTasks.filter_data_task/2,
  extra_args: [30] # Filter threshold, passed as second arg to filter_data_task
}

# Step 3: Transform data
transform_fn = %Function{
  id: :transform_data,
  args: [:filter_data], # Depends on filter_data, result passed as first arg to transform_data_task
  code: &PipelineTasks.transform_data_task/2,
  extra_args: [:square] # Operation to apply, passed as second arg to transform_data_task
}

# Step 4: Aggregate results
aggregate_fn = %Function{
  id: :aggregate_data,
  args: [:transform_data], # Depends on transform_data, result passed as the arg to aggregate_data_task
  code: &PipelineTasks.aggregate_data_task/1
  # No extra_args for this function
}

# Step 5: Format output
format_fn = %Function{
  id: :format_output,
  args: [:aggregate_data, :filter_data], # Results passed in order to format_output_task/2
  code: &PipelineTasks.format_output_task/2,
  node: Node.self() # force the output to be collected at the calling node
}

assembling-and-executing-the-dag

Assembling and Executing the DAG

# Build the DAG
dag =
  dag
  |> Handoff.DAG.add_function(generate_fn)
  |> Handoff.DAG.add_function(filter_fn)
  |> Handoff.DAG.add_function(transform_fn)
  |> Handoff.DAG.add_function(aggregate_fn)
  |> Handoff.DAG.add_function(format_fn)

# Validate the DAG
case Handoff.DAG.validate(dag) do
  :ok ->
    IO.puts("\nExecuting data pipeline...\n")

    # Execute the DAG using DistributedExecutor
    case Handoff.DistributedExecutor.execute(dag) do
      {:ok, execution_result} ->
        # execution_result is %{dag_id: ..., results: actual_results, allocations: ...}
        final_output = execution_result.results[:format_output]
        IO.puts(final_output)
        IO.inspect(execution_result.allocations, label: "Function Allocations")

      {:error, reason} ->
        IO.puts("Error executing pipeline: #{inspect(reason)}")
    end

  {:error, reason} ->
    IO.puts("Invalid DAG: #{inspect(reason)}")
end

experiment-modifying-parameters

Experiment: Modifying Parameters

Try changing the parameters to see how they affect the results:

# Create a new DAG with different parameters
# Using a new DAG ID for the modified version
modified_dag_id = "simple_pipeline_dag_modified"
modified_dag = Handoff.DAG.new(modified_dag_id)

# Use the same task functions but modify extra_args for some Handoff.Function structs
modified_filter_fn = %{filter_fn | extra_args: [50]} # Higher threshold
modified_transform_fn = %{transform_fn | extra_args: [:sqrt]} # Different operation

# Rebuild the DAG
modified_dag =
  modified_dag
  |> Handoff.DAG.add_function(generate_fn) # Reusing generate_fn definition
  |> Handoff.DAG.add_function(modified_filter_fn)
  |> Handoff.DAG.add_function(modified_transform_fn)
  |> Handoff.DAG.add_function(aggregate_fn)    # Reusing aggregate_fn definition
  |> Handoff.DAG.add_function(format_fn)       # Reusing format_fn definition

# Validate and Execute the modified DAG
case Handoff.DAG.validate(modified_dag) do
  :ok ->
    IO.puts("\nExecuting modified data pipeline...\n")
    case Handoff.DistributedExecutor.execute(modified_dag) do
      {:ok, execution_result} ->
        final_output = execution_result.results[:format_output]
        IO.puts(final_output)
        IO.inspect(execution_result.allocations, label: "Modified DAG Allocations")
      {:error, reason} ->
        IO.puts("Error executing modified pipeline: #{inspect(reason)}")
    end
  {:error, reason} ->
    IO.puts("Invalid modified DAG: #{inspect(reason)}")
end

key-concepts-demonstrated

Key Concepts Demonstrated

  • Basic DAG construction
  • Function dependencies
  • Result passing between functions
  • Error handling
  • Parameterization via extra_args