How to Build Data Processing Pipelines

View Source

Problem

You need to process large datasets through multiple transformation steps with error handling, progress tracking, and efficient batch processing. Your data pipeline should handle millions of records while being resilient to failures.

Solution Overview

This guide shows you how to build robust ETL (Extract, Transform, Load) and batch processing workflows using Reactor's map steps, async processing, and collect patterns. You'll learn to process data efficiently at scale.

Prerequisites

  • Understanding of Reactor basics (inputs, steps, arguments)
  • Familiarity with Elixir's Enum and Stream modules
  • Basic knowledge of data processing concepts

Processing Large Datasets

Reactor uses Iterex internally for efficient, resumable iteration over large datasets. This provides several advantages over standard Elixir streams:

Key Benefits:

  • Resumable: Can pause and resume processing from any point
  • Memory efficient: Processes data in configurable chunks without loading everything into memory
  • Lazy evaluation: Only processes data as needed, not all at once
  • Resource management: Proper cleanup of external resources (files, database connections)

When you need these patterns:

  • Processing files larger than available memory (> 1GB)
  • Long-running ETL jobs that may need to pause/resume
  • Data pipelines requiring checkpoint/recovery capabilities
  • Streaming data from external APIs with rate limits

Complete ETL Pipeline Example

Here's a complete data processing pipeline that extracts user data, transforms it, and loads it into multiple destinations:

defmodule DataPipeline.UserETL do
  use Reactor

  input :source_file
  input :output_destinations

  # Step 1: Extract - Read and parse data
  step :extract_data, DataPipeline.Steps.ExtractCSV do
    argument :file_path, input(:source_file)
  end

  # Step 2: Validate data quality
  step :validate_data_quality, DataPipeline.Steps.DataQualityCheck do
    argument :raw_data, result(:extract_data)
  end

  # Step 3: Transform users in batches
  map :transform_users do
    source result(:extract_data, [:users])
    allow_async? true
    return :validate_user

    step :clean_user do
      argument :user, element(:transform_users)
      run fn %{user: user} ->
        clean_and_normalize_user(%{user: user, rules: %{}})
      end
    end

    step :enrich_user do
      argument :clean_user, result(:clean_user)
      run &enrich_with_external_data/1
    end

    step :validate_user do
      argument :user, result(:enrich_user)
      run &validate_business_rules/1
    end
  end

  # Step 4: Collect transformation results
  collect :process_results do
    argument :transformed_users, result(:transform_users)
    argument :source_stats, result(:extract_data, [:stats])
    
    transform fn %{transformed_users: users, source_stats: stats} ->
      successful_users = Enum.filter(users, &match?({:ok, _}, &1))
      failed_users = Enum.filter(users, &match?({:error, _}, &1))
      
      %{
        successful: Enum.map(successful_users, &elem(&1, 1)),
        failed: failed_users,
        source_count: stats.total_count,
        success_rate: length(successful_users) / length(users)
      }
    end
  end

  # Step 5: Load to multiple destinations in parallel
  step :load_to_database, DataPipeline.Steps.LoadToDatabase do
    argument :users, result(:process_results, [:successful])
    async? true
  end

  step :load_to_search_index, DataPipeline.Steps.LoadToElasticsearch do
    argument :users, result(:process_results, [:successful])
    async? true
  end

  step :generate_report, DataPipeline.Steps.GenerateProcessingReport do
    argument :results, result(:process_results)
    wait_for [:load_to_database, :load_to_search_index]
  end

  return :generate_report
end

Step Implementations

1. Data Extraction

defmodule DataPipeline.Steps.ExtractCSV do
  use Reactor.Step

  @impl true
  def run(%{file_path: path}, _context, _options) do
    case File.exists?(path) do
      true ->
        users = 
          path
          |> File.stream!()
          |> CSV.decode!(headers: true)
          |> Enum.to_list()
        
        stats = %{
          total_count: length(users),
          file_size: File.stat!(path).size,
          extracted_at: DateTime.utc_now()
        }
        
        {:ok, %{users: users, stats: stats}}
        
      false ->
        {:error, "Source file not found: #{path}"}
    end
  end
end

2. Data Quality Validation

defmodule DataPipeline.Steps.DataQualityCheck do
  use Reactor.Step

  @impl true
  def run(%{raw_data: %{users: users}}, _context, _options) do
    # Analyze data quality
    quality_issues = analyze_quality(users)
    
    rules = %{
      email_required: true,
      phone_format: ~r/^\+?[\d\s\-\(\)]+$/,
      name_min_length: 2,
      max_age: 120
    }
    
    case quality_issues do
      [] -> 
        {:ok, %{rules: rules, issues: [], status: :passed}}
      issues when length(issues) < 100 ->
        {:ok, %{rules: rules, issues: issues, status: :warnings}}
      issues ->
        {:error, "Too many quality issues: #{length(issues)} problems found"}
    end
  end

  defp analyze_quality(users) do
    users
    |> Enum.with_index()
    |> Enum.flat_map(fn {user, index} ->
      check_user_quality(user, index)
    end)
  end

  defp check_user_quality(user, index) do
    issues = []
    
    issues = if is_nil(user["email"]) or user["email"] == "" do
      ["Row #{index + 1}: Missing email" | issues]
    else
      issues
    end
    
    issues = if is_nil(user["name"]) or String.length(user["name"]) < 2 do
      ["Row #{index + 1}: Invalid name" | issues]
    else
      issues
    end
    
    issues
  end
end

3. User Transformation Functions

def clean_and_normalize_user(%{user: user, rules: _rules}) do
  cleaned = %{
    id: user["id"],
    email: String.downcase(String.trim(user["email"] || "")),
    name: String.trim(user["name"] || ""),
    phone: normalize_phone(user["phone"]),
    age: parse_age(user["age"]),
    created_at: DateTime.utc_now()
  }
  
  {:ok, cleaned}
rescue
  e -> {:error, "Failed to clean user #{user["id"]}: #{inspect(e)}"}
end

def enrich_with_external_data(%{clean_user: user}) do
  case ExternalAPI.get_user_profile(user.email) do
    {:ok, profile} ->
      enriched = Map.merge(user, %{
        company: profile.company,
        location: profile.location,
        verified: profile.verified
      })
      {:ok, enriched}
      
    {:error, :not_found} ->
      # Continue without enrichment
      {:ok, Map.put(user, :verified, false)}
      
    {:error, reason} ->
      {:error, "Enrichment failed for #{user.email}: #{reason}"}
  end
end

def validate_business_rules(%{user: user}) do
  errors = []
  
  errors = if String.length(user.name) < 2 do
    ["Name too short" | errors]
  else
    errors
  end
  
  errors = if not String.contains?(user.email, "@") do
    ["Invalid email format" | errors]
  else
    errors
  end
  
  errors = if user.age && user.age > 120 do
    ["Unrealistic age" | errors]
  else
    errors
  end
  
  case errors do
    [] -> {:ok, user}
    errors -> {:error, "Validation failed: #{Enum.join(errors, ", ")}"}
  end
end

4. Data Loading Steps

defmodule DataPipeline.Steps.LoadToDatabase do
  use Reactor.Step

  @impl true
  def run(%{users: users}, _context, _options) do
    batches = Enum.chunk_every(users, 1000)
    
    results = Enum.map(batches, fn batch ->
      case MyApp.Repo.insert_all("users", batch, 
           on_conflict: :replace_all,
           conflict_target: [:email]) do
        {count, _} -> {:ok, count}
        error -> {:error, error}
      end
    end)
    
    total_inserted = results
    |> Enum.filter(&match?({:ok, _}, &1))
    |> Enum.map(&elem(&1, 1))
    |> Enum.sum()
    
    {:ok, %{inserted: total_inserted, total_batches: length(batches)}}
  end

  @impl true
  def compensate(_reason, %{users: users}, _context, _options) do
    # Cleanup on failure - remove any partially inserted data
    user_emails = Enum.map(users, & &1.email)
    MyApp.Repo.delete_all(from u in "users", where: u.email in ^user_emails)
    :ok
  end
end

defmodule DataPipeline.Steps.LoadToElasticsearch do
  use Reactor.Step

  @impl true
  def run(%{users: users}, _context, _options) do
    bulk_requests = Enum.map(users, fn user ->
      %{
        index: %{
          _index: "users",
          _id: user.id,
          _source: user
        }
      }
    end)
    
    case Elasticsearch.bulk_request(bulk_requests) do
      {:ok, response} ->
        indexed = response["items"] |> length()
        {:ok, %{indexed: indexed}}
        
      {:error, reason} ->
        {:error, "Elasticsearch indexing failed: #{reason}"}
    end
  end
end

Running the Pipeline

# Process a large CSV file
{:ok, report} = Reactor.run(DataPipeline.UserETL, %{
  source_file: "/data/users_export.csv",
  output_destinations: [:database, :elasticsearch]
})

IO.puts("Processing completed!")
IO.puts("Success rate: #{report.success_rate * 100}%")
IO.puts("Total processed: #{report.source_count}")

Advanced Patterns

Streaming Large Files

For very large files, process data in chunks to avoid memory issues and enable efficient error handling:

step :extract_streaming_data do
  run fn %{file_path: path} ->
    iter = 
      path
      |> File.stream!()
      |> CSV.decode!(headers: true)
      |> Iter.from()
    
    {:ok, iter}
  end
end

map :process_streaming_data do
  source result(:extract_streaming_data)
  allow_async? true
  return :process_chunk

  compose :process_chunk, ChunkProcessor do
    argument :chunk_data, element(:process_streaming_data)
  end
end

defmodule ChunkProcessor do
  use Reactor

  input :chunk_data

  step :validate_chunk do
    argument :data, input(:chunk_data)
    run &validate_chunk_data/1
  end

  step :transform_records do
    argument :data, result(:validate_chunk)
    run fn %{data: records} ->
      processed = Enum.map(records, &transform_record/1)
      {:ok, processed}
    end
  end

  step :save_checkpoint do
    argument :original, input(:chunk_data)
    argument :processed, result(:transform_records)
    
    run fn %{original: orig, processed: proc} ->
      save_chunk_checkpoint(orig, proc)
      {:ok, proc}
    end
    
    compensate fn _reason, %{original: chunk_data}, _context, _options ->
      log_failed_chunk(chunk_data)
      :retry
    end
  end

  return :save_checkpoint
end

Resumable Data Processing

Pause and resume processing across reactor executions:

step :prepare_processing_iter do
  argument :source_iter, result(:extract_streaming_data)
  
  run fn %{source_iter: iter} ->
    if File.exists?("/tmp/processing_checkpoint.json") do
      # Resume from checkpoint
      checkpoint = load_checkpoint()
      remaining_iter = Iter.drop(iter, checkpoint.processed_count)
      {:ok, remaining_iter}
    else
      # Start fresh processing
      {:ok, iter}
    end
  end
end

Parallel Processing with Error Handling

map :process_files do
  source input(:file_list)
  allow_async? true

  step :process_file do
    argument :file_path, element(:process_files)
    max_retries 3
    
    run fn %{file_path: path} ->
      case process_single_file(path) do
        {:ok, result} -> {:ok, result}
        {:error, :temp_failure} -> :retry
        {:error, reason} -> {:error, reason}
      end
    end
    
    compensate fn reason, %{file_path: path}, _, _ ->
      case reason do
        %File.Error{} -> :retry
        _other -> :ok
      end
    end
  end
end

Data Aggregation

map :calculate_totals do
  source input(:sales_data)
  
  step :sum_by_region do
    argument :region_data, element(:calculate_totals)
    
    run fn %{region_data: data} ->
      total = Enum.sum(Enum.map(data, & &1.amount))
      {:ok, %{region: data.region, total: total}}
    end
  end
end

collect :overall_totals do
  argument :region_totals, result(:calculate_totals)
  
  transform fn %{region_totals: totals} ->
    grand_total = Enum.sum(Enum.map(totals, & &1.total))
    %{grand_total: grand_total, by_region: totals}
  end
end

Monitoring and Observability

Progress Tracking

debug :log_progress do
  argument :batch_results, result(:transform_users)
  argument :message, value("Batch processing completed")
end

# Or use a regular step for custom progress tracking
step :track_progress do
  argument :batch_results, result(:transform_users)
  
  run fn %{batch_results: results} ->
    completed = length(results)
    successful = Enum.count(results, &match?({:ok, _}, &1))
    
    IO.puts("Processed #{completed} records, #{successful} successful")
    {:ok, %{completed: completed, successful: successful}}
  end
end

Telemetry Integration

Reactor provides observability using the conventional telemetry package. Add the telemetry middleware to emit events for monitoring your data pipelines:

defmodule DataPipeline.UserETL do
  use Reactor
  
  middlewares do
    middleware Reactor.Middleware.Telemetry
  end
  
  # Steps...
end

# In your application
:telemetry.attach("data-pipeline-handler", 
  [:reactor, :step, :stop], 
  &DataPipeline.Telemetry.handle_event/4, 
  %{}
)

Troubleshooting

Common Issues

Data inconsistency:

  • Implement proper compensation for database operations
  • Use database transactions where appropriate
  • Add data validation at multiple stages

For memory and performance issues, see Performance Optimization.

Debugging Tips

Add debug steps to monitor data flow:

debug :inspect_batch do
  argument :batch, element(:transform_users)
  argument :label, value("Processing batch")
end