View Source Simple Data Pipeline Example
introduction
Introduction
This example demonstrates using Handoff to build a simple data processing pipeline. The pipeline processes random data through a series of transformation steps, showing how to construct and execute a directed acyclic graph (DAG) of operations.
setup
Setup
First, let's make sure the Handoff library is available:
Mix.install([{:handoff, "~> 0.1"}])
statistics-helper-module
Statistics Helper Module
defmodule Stats do
@moduledoc """
Simple statistics helpers for the pipeline example
"""
def mean(values) when is_list(values) and length(values) > 0 do
Enum.sum(values) / length(values)
end
def median(values) when is_list(values) and length(values) > 0 do
sorted = Enum.sort(values)
mid = div(length(sorted), 2)
if rem(length(sorted), 2) == 0 do
(Enum.at(sorted, mid - 1) + Enum.at(sorted, mid)) / 2
else
Enum.at(sorted, mid)
end
end
def standard_deviation(values) when is_list(values) and length(values) > 1 do
avg = mean(values)
variance = Enum.map(values, fn x -> :math.pow(x - avg, 2) end) |> mean()
:math.sqrt(variance)
end
end
pipeline-tasks-module
Pipeline Tasks Module
defmodule PipelineTasks do
@moduledoc """
Contains the core logic for each step in the simple data pipeline.
Functions are designed to be called by Handoff.
"""
def generate_data_task() do
IO.puts("Generating 100 random values...")
for _ <- 1..100, do: :rand.uniform() * 100
end
def filter_data_task(data, threshold) do
IO.puts("Filtering values > #{threshold}...")
Enum.filter(data, fn x -> x > threshold end)
end
def transform_data_task(data, operation) do
IO.puts("Applying #{operation} transformation...")
case operation do
:square -> Enum.map(data, fn x -> x * x end)
:sqrt -> Enum.map(data, fn x -> :math.sqrt(x) end)
:log -> Enum.map(data, fn x -> :math.log(x) end)
_ -> data # Default: no transformation
end
end
def aggregate_data_task(data) do
IO.puts("Calculating statistics...")
%{
count: length(data),
min: Enum.min(data),
max: Enum.max(data),
mean: Stats.mean(data),
median: Stats.median(data),
stddev: Stats.standard_deviation(data)
}
end
def format_output_task(stats, original_data) do
IO.puts("Formatting results...")
"""
DATA PIPELINE RESULTS
---------------------
Original filtered data count: #{length(original_data)}
STATISTICS:
Count: #{stats.count}
Min: #{Float.round(stats.min, 2)}
Max: #{Float.round(stats.max, 2)}
Mean: #{Float.round(stats.mean, 2)}
Median: #{Float.round(stats.median, 2)}
StdDev: #{Float.round(stats.stddev, 2)}
"""
end
end
starting-handoff
Starting Handoff
# Ensure Handoff application is started
# In a real application, this is usually in your application.ex.
# For scripts/Livebooks, start it manually if not already running.
if GenServer.whereis(Handoff.DistributedExecutor) == nil do
{:ok, _pid} = Handoff.start_link() # Or Handoff.Application.start(:normal, [])
IO.puts "Handoff Application started."
else
IO.puts "Handoff Application already running."
end
# Register the local node with some default capabilities for local execution
# This allows DistributedExecutor to find and use the current node.
Handoff.DistributedExecutor.register_local_node(%{cpu: 2, memory: 1024, gpu: 0})
IO.puts "Local node registered with Handoff."
# Create a new DAG
dag = Handoff.DAG.new("simple_pipeline_dag")
building-the-pipeline
Building the Pipeline
Let's define each step in our data pipeline:
alias Handoff.Function
# Step 1: Generate random data
generate_fn = %Function{
id: :generate_data,
args: [], # No dependencies
code: &PipelineTasks.generate_data_task/0
}
# Step 2: Filter data
filter_fn = %Function{
id: :filter_data,
args: [:generate_data], # Depends on generate_data, result passed as first arg to filter_data_task
code: &PipelineTasks.filter_data_task/2,
extra_args: [30] # Filter threshold, passed as second arg to filter_data_task
}
# Step 3: Transform data
transform_fn = %Function{
id: :transform_data,
args: [:filter_data], # Depends on filter_data, result passed as first arg to transform_data_task
code: &PipelineTasks.transform_data_task/2,
extra_args: [:square] # Operation to apply, passed as second arg to transform_data_task
}
# Step 4: Aggregate results
aggregate_fn = %Function{
id: :aggregate_data,
args: [:transform_data], # Depends on transform_data, result passed as the arg to aggregate_data_task
code: &PipelineTasks.aggregate_data_task/1
# No extra_args for this function
}
# Step 5: Format output
format_fn = %Function{
id: :format_output,
args: [:aggregate_data, :filter_data], # Results passed in order to format_output_task/2
code: &PipelineTasks.format_output_task/2,
node: Node.self() # force the output to be collected at the calling node
}
assembling-and-executing-the-dag
Assembling and Executing the DAG
# Build the DAG
dag =
dag
|> Handoff.DAG.add_function(generate_fn)
|> Handoff.DAG.add_function(filter_fn)
|> Handoff.DAG.add_function(transform_fn)
|> Handoff.DAG.add_function(aggregate_fn)
|> Handoff.DAG.add_function(format_fn)
# Validate the DAG
case Handoff.DAG.validate(dag) do
:ok ->
IO.puts("\nExecuting data pipeline...\n")
# Execute the DAG using DistributedExecutor
case Handoff.DistributedExecutor.execute(dag) do
{:ok, execution_result} ->
# execution_result is %{dag_id: ..., results: actual_results, allocations: ...}
final_output = execution_result.results[:format_output]
IO.puts(final_output)
IO.inspect(execution_result.allocations, label: "Function Allocations")
{:error, reason} ->
IO.puts("Error executing pipeline: #{inspect(reason)}")
end
{:error, reason} ->
IO.puts("Invalid DAG: #{inspect(reason)}")
end
experiment-modifying-parameters
Experiment: Modifying Parameters
Try changing the parameters to see how they affect the results:
# Create a new DAG with different parameters
# Using a new DAG ID for the modified version
modified_dag_id = "simple_pipeline_dag_modified"
modified_dag = Handoff.DAG.new(modified_dag_id)
# Use the same task functions but modify extra_args for some Handoff.Function structs
modified_filter_fn = %{filter_fn | extra_args: [50]} # Higher threshold
modified_transform_fn = %{transform_fn | extra_args: [:sqrt]} # Different operation
# Rebuild the DAG
modified_dag =
modified_dag
|> Handoff.DAG.add_function(generate_fn) # Reusing generate_fn definition
|> Handoff.DAG.add_function(modified_filter_fn)
|> Handoff.DAG.add_function(modified_transform_fn)
|> Handoff.DAG.add_function(aggregate_fn) # Reusing aggregate_fn definition
|> Handoff.DAG.add_function(format_fn) # Reusing format_fn definition
# Validate and Execute the modified DAG
case Handoff.DAG.validate(modified_dag) do
:ok ->
IO.puts("\nExecuting modified data pipeline...\n")
case Handoff.DistributedExecutor.execute(modified_dag) do
{:ok, execution_result} ->
final_output = execution_result.results[:format_output]
IO.puts(final_output)
IO.inspect(execution_result.allocations, label: "Modified DAG Allocations")
{:error, reason} ->
IO.puts("Error executing modified pipeline: #{inspect(reason)}")
end
{:error, reason} ->
IO.puts("Invalid modified DAG: #{inspect(reason)}")
end
key-concepts-demonstrated
Key Concepts Demonstrated
- Basic DAG construction
- Function dependencies
- Result passing between functions
- Error handling
- Parameterization via extra_args