How to Build Data Processing Pipelines
View SourceProblem
You need to process large datasets through multiple transformation steps with error handling, progress tracking, and efficient batch processing. Your data pipeline should handle millions of records while being resilient to failures.
Solution Overview
This guide shows you how to build robust ETL (Extract, Transform, Load) and batch processing workflows using Reactor's map
steps, async processing, and collect patterns. You'll learn to process data efficiently at scale.
Prerequisites
- Understanding of Reactor basics (inputs, steps, arguments)
- Familiarity with Elixir's
Enum
andStream
modules - Basic knowledge of data processing concepts
Processing Large Datasets
Reactor uses Iterex internally for efficient, resumable iteration over large datasets. This provides several advantages over standard Elixir streams:
Key Benefits:
- Resumable: Can pause and resume processing from any point
- Memory efficient: Processes data in configurable chunks without loading everything into memory
- Lazy evaluation: Only processes data as needed, not all at once
- Resource management: Proper cleanup of external resources (files, database connections)
When you need these patterns:
- Processing files larger than available memory (> 1GB)
- Long-running ETL jobs that may need to pause/resume
- Data pipelines requiring checkpoint/recovery capabilities
- Streaming data from external APIs with rate limits
Complete ETL Pipeline Example
Here's a complete data processing pipeline that extracts user data, transforms it, and loads it into multiple destinations:
defmodule DataPipeline.UserETL do
use Reactor
input :source_file
input :output_destinations
# Step 1: Extract - Read and parse data
step :extract_data, DataPipeline.Steps.ExtractCSV do
argument :file_path, input(:source_file)
end
# Step 2: Validate data quality
step :validate_data_quality, DataPipeline.Steps.DataQualityCheck do
argument :raw_data, result(:extract_data)
end
# Step 3: Transform users in batches
map :transform_users do
source result(:extract_data, [:users])
allow_async? true
return :validate_user
step :clean_user do
argument :user, element(:transform_users)
run fn %{user: user} ->
clean_and_normalize_user(%{user: user, rules: %{}})
end
end
step :enrich_user do
argument :clean_user, result(:clean_user)
run &enrich_with_external_data/1
end
step :validate_user do
argument :user, result(:enrich_user)
run &validate_business_rules/1
end
end
# Step 4: Collect transformation results
collect :process_results do
argument :transformed_users, result(:transform_users)
argument :source_stats, result(:extract_data, [:stats])
transform fn %{transformed_users: users, source_stats: stats} ->
successful_users = Enum.filter(users, &match?({:ok, _}, &1))
failed_users = Enum.filter(users, &match?({:error, _}, &1))
%{
successful: Enum.map(successful_users, &elem(&1, 1)),
failed: failed_users,
source_count: stats.total_count,
success_rate: length(successful_users) / length(users)
}
end
end
# Step 5: Load to multiple destinations in parallel
step :load_to_database, DataPipeline.Steps.LoadToDatabase do
argument :users, result(:process_results, [:successful])
async? true
end
step :load_to_search_index, DataPipeline.Steps.LoadToElasticsearch do
argument :users, result(:process_results, [:successful])
async? true
end
step :generate_report, DataPipeline.Steps.GenerateProcessingReport do
argument :results, result(:process_results)
wait_for [:load_to_database, :load_to_search_index]
end
return :generate_report
end
Step Implementations
1. Data Extraction
defmodule DataPipeline.Steps.ExtractCSV do
use Reactor.Step
@impl true
def run(%{file_path: path}, _context, _options) do
case File.exists?(path) do
true ->
users =
path
|> File.stream!()
|> CSV.decode!(headers: true)
|> Enum.to_list()
stats = %{
total_count: length(users),
file_size: File.stat!(path).size,
extracted_at: DateTime.utc_now()
}
{:ok, %{users: users, stats: stats}}
false ->
{:error, "Source file not found: #{path}"}
end
end
end
2. Data Quality Validation
defmodule DataPipeline.Steps.DataQualityCheck do
use Reactor.Step
@impl true
def run(%{raw_data: %{users: users}}, _context, _options) do
# Analyze data quality
quality_issues = analyze_quality(users)
rules = %{
email_required: true,
phone_format: ~r/^\+?[\d\s\-\(\)]+$/,
name_min_length: 2,
max_age: 120
}
case quality_issues do
[] ->
{:ok, %{rules: rules, issues: [], status: :passed}}
issues when length(issues) < 100 ->
{:ok, %{rules: rules, issues: issues, status: :warnings}}
issues ->
{:error, "Too many quality issues: #{length(issues)} problems found"}
end
end
defp analyze_quality(users) do
users
|> Enum.with_index()
|> Enum.flat_map(fn {user, index} ->
check_user_quality(user, index)
end)
end
defp check_user_quality(user, index) do
issues = []
issues = if is_nil(user["email"]) or user["email"] == "" do
["Row #{index + 1}: Missing email" | issues]
else
issues
end
issues = if is_nil(user["name"]) or String.length(user["name"]) < 2 do
["Row #{index + 1}: Invalid name" | issues]
else
issues
end
issues
end
end
3. User Transformation Functions
def clean_and_normalize_user(%{user: user, rules: _rules}) do
cleaned = %{
id: user["id"],
email: String.downcase(String.trim(user["email"] || "")),
name: String.trim(user["name"] || ""),
phone: normalize_phone(user["phone"]),
age: parse_age(user["age"]),
created_at: DateTime.utc_now()
}
{:ok, cleaned}
rescue
e -> {:error, "Failed to clean user #{user["id"]}: #{inspect(e)}"}
end
def enrich_with_external_data(%{clean_user: user}) do
case ExternalAPI.get_user_profile(user.email) do
{:ok, profile} ->
enriched = Map.merge(user, %{
company: profile.company,
location: profile.location,
verified: profile.verified
})
{:ok, enriched}
{:error, :not_found} ->
# Continue without enrichment
{:ok, Map.put(user, :verified, false)}
{:error, reason} ->
{:error, "Enrichment failed for #{user.email}: #{reason}"}
end
end
def validate_business_rules(%{user: user}) do
errors = []
errors = if String.length(user.name) < 2 do
["Name too short" | errors]
else
errors
end
errors = if not String.contains?(user.email, "@") do
["Invalid email format" | errors]
else
errors
end
errors = if user.age && user.age > 120 do
["Unrealistic age" | errors]
else
errors
end
case errors do
[] -> {:ok, user}
errors -> {:error, "Validation failed: #{Enum.join(errors, ", ")}"}
end
end
4. Data Loading Steps
defmodule DataPipeline.Steps.LoadToDatabase do
use Reactor.Step
@impl true
def run(%{users: users}, _context, _options) do
batches = Enum.chunk_every(users, 1000)
results = Enum.map(batches, fn batch ->
case MyApp.Repo.insert_all("users", batch,
on_conflict: :replace_all,
conflict_target: [:email]) do
{count, _} -> {:ok, count}
error -> {:error, error}
end
end)
total_inserted = results
|> Enum.filter(&match?({:ok, _}, &1))
|> Enum.map(&elem(&1, 1))
|> Enum.sum()
{:ok, %{inserted: total_inserted, total_batches: length(batches)}}
end
@impl true
def compensate(_reason, %{users: users}, _context, _options) do
# Cleanup on failure - remove any partially inserted data
user_emails = Enum.map(users, & &1.email)
MyApp.Repo.delete_all(from u in "users", where: u.email in ^user_emails)
:ok
end
end
defmodule DataPipeline.Steps.LoadToElasticsearch do
use Reactor.Step
@impl true
def run(%{users: users}, _context, _options) do
bulk_requests = Enum.map(users, fn user ->
%{
index: %{
_index: "users",
_id: user.id,
_source: user
}
}
end)
case Elasticsearch.bulk_request(bulk_requests) do
{:ok, response} ->
indexed = response["items"] |> length()
{:ok, %{indexed: indexed}}
{:error, reason} ->
{:error, "Elasticsearch indexing failed: #{reason}"}
end
end
end
Running the Pipeline
# Process a large CSV file
{:ok, report} = Reactor.run(DataPipeline.UserETL, %{
source_file: "/data/users_export.csv",
output_destinations: [:database, :elasticsearch]
})
IO.puts("Processing completed!")
IO.puts("Success rate: #{report.success_rate * 100}%")
IO.puts("Total processed: #{report.source_count}")
Advanced Patterns
Streaming Large Files
For very large files, process data in chunks to avoid memory issues and enable efficient error handling:
step :extract_streaming_data do
run fn %{file_path: path} ->
iter =
path
|> File.stream!()
|> CSV.decode!(headers: true)
|> Iter.from()
{:ok, iter}
end
end
map :process_streaming_data do
source result(:extract_streaming_data)
allow_async? true
return :process_chunk
compose :process_chunk, ChunkProcessor do
argument :chunk_data, element(:process_streaming_data)
end
end
defmodule ChunkProcessor do
use Reactor
input :chunk_data
step :validate_chunk do
argument :data, input(:chunk_data)
run &validate_chunk_data/1
end
step :transform_records do
argument :data, result(:validate_chunk)
run fn %{data: records} ->
processed = Enum.map(records, &transform_record/1)
{:ok, processed}
end
end
step :save_checkpoint do
argument :original, input(:chunk_data)
argument :processed, result(:transform_records)
run fn %{original: orig, processed: proc} ->
save_chunk_checkpoint(orig, proc)
{:ok, proc}
end
compensate fn _reason, %{original: chunk_data}, _context, _options ->
log_failed_chunk(chunk_data)
:retry
end
end
return :save_checkpoint
end
Resumable Data Processing
Pause and resume processing across reactor executions:
step :prepare_processing_iter do
argument :source_iter, result(:extract_streaming_data)
run fn %{source_iter: iter} ->
if File.exists?("/tmp/processing_checkpoint.json") do
# Resume from checkpoint
checkpoint = load_checkpoint()
remaining_iter = Iter.drop(iter, checkpoint.processed_count)
{:ok, remaining_iter}
else
# Start fresh processing
{:ok, iter}
end
end
end
Parallel Processing with Error Handling
map :process_files do
source input(:file_list)
allow_async? true
step :process_file do
argument :file_path, element(:process_files)
max_retries 3
run fn %{file_path: path} ->
case process_single_file(path) do
{:ok, result} -> {:ok, result}
{:error, :temp_failure} -> :retry
{:error, reason} -> {:error, reason}
end
end
compensate fn reason, %{file_path: path}, _, _ ->
case reason do
%File.Error{} -> :retry
_other -> :ok
end
end
end
end
Data Aggregation
map :calculate_totals do
source input(:sales_data)
step :sum_by_region do
argument :region_data, element(:calculate_totals)
run fn %{region_data: data} ->
total = Enum.sum(Enum.map(data, & &1.amount))
{:ok, %{region: data.region, total: total}}
end
end
end
collect :overall_totals do
argument :region_totals, result(:calculate_totals)
transform fn %{region_totals: totals} ->
grand_total = Enum.sum(Enum.map(totals, & &1.total))
%{grand_total: grand_total, by_region: totals}
end
end
Monitoring and Observability
Progress Tracking
debug :log_progress do
argument :batch_results, result(:transform_users)
argument :message, value("Batch processing completed")
end
# Or use a regular step for custom progress tracking
step :track_progress do
argument :batch_results, result(:transform_users)
run fn %{batch_results: results} ->
completed = length(results)
successful = Enum.count(results, &match?({:ok, _}, &1))
IO.puts("Processed #{completed} records, #{successful} successful")
{:ok, %{completed: completed, successful: successful}}
end
end
Telemetry Integration
Reactor provides observability using the conventional telemetry package. Add the telemetry middleware to emit events for monitoring your data pipelines:
defmodule DataPipeline.UserETL do
use Reactor
middlewares do
middleware Reactor.Middleware.Telemetry
end
# Steps...
end
# In your application
:telemetry.attach("data-pipeline-handler",
[:reactor, :step, :stop],
&DataPipeline.Telemetry.handle_event/4,
%{}
)
Troubleshooting
Common Issues
Data inconsistency:
- Implement proper compensation for database operations
- Use database transactions where appropriate
- Add data validation at multiple stages
For memory and performance issues, see Performance Optimization.
Debugging Tips
Add debug steps to monitor data flow:
debug :inspect_batch do
argument :batch, element(:transform_users)
argument :label, value("Processing batch")
end
Related Guides
- Performance Optimization - Scaling techniques
- Testing Strategies - Testing data pipelines
- Async Workflows Tutorial - Concurrency basics