Building Async Workflows
View SourceIn this tutorial, you'll learn how to build efficient concurrent workflows that take advantage of Reactor's dependency resolution and async execution capabilities.
What you'll build
A data processing pipeline that:
- Fetches user data from multiple sources concurrently
- Processes different data types in parallel
- Aggregates results efficiently
- Handles mixed sync/async requirements
- Optimizes performance with proper concurrency control
You'll learn
- How Reactor's concurrency model works
- When to use async vs sync execution
- How to optimize workflow performance
- Managing dependencies for maximum parallelisation
- Controlling concurrency limits and resource usage
Prerequisites
- Complete the Getting Started tutorial
- Complete the Error Handling tutorial
- Basic understanding of Elixir processes
Step 1: Set up the project
If you don't have a project from the previous tutorials:
mix igniter.new reactor_tutorial --install reactor
cd reactor_tutorial
Step 2: Understanding Reactor's concurrency model
Reactor runs steps asynchronously by default:
- Independent steps run in parallel - Steps with no dependencies execute simultaneously
- Dependencies control execution order - Steps wait for their dependencies to complete
- Automatic task management - Reactor manages Elixir tasks and supervision
- Configurable concurrency - Control how many steps run at once
Async vs Sync execution
# Async (default) - runs in a separate task
step :fetch_data do
async? true # This is the default
run &fetch_from_api/1
end
# Sync - runs in the main process
step :critical_operation do
async? false # Forces synchronous execution
run &update_database/1
end
Step 3: Create simple data operations
Let's create some data operations that will run concurrently. Create lib/data_sources.ex
:
defmodule DataSources do
def fetch_user_profile(user_id) do
Process.sleep(200)
{:ok, %{
id: user_id,
name: "User #{user_id}",
email: "user#{user_id}@example.com"
}}
end
def fetch_user_preferences(user_id) do
Process.sleep(150)
{:ok, %{
user_id: user_id,
theme: "light",
language: "en"
}}
end
def fetch_user_activity(user_id) do
Process.sleep(300)
{:ok, %{
user_id: user_id,
last_login: DateTime.utc_now(),
login_count: 42
}}
end
end
Step 4: Build a concurrent data reactor
Now let's build a reactor that fetches data concurrently. Create lib/async_user_data_reactor.ex
:
defmodule AsyncUserDataReactor do
use Reactor
input :user_id
# These steps have no dependencies on each other, so they run in parallel
step :fetch_profile do
argument :user_id, input(:user_id)
run fn %{user_id: user_id}, _context ->
DataSources.fetch_user_profile(user_id)
end
end
step :fetch_preferences do
argument :user_id, input(:user_id)
run fn %{user_id: user_id}, _context ->
DataSources.fetch_user_preferences(user_id)
end
end
step :fetch_activity do
argument :user_id, input(:user_id)
run fn %{user_id: user_id}, _context ->
DataSources.fetch_user_activity(user_id)
end
end
# This step waits for all the fetch steps to complete
step :aggregate_data do
argument :profile, result(:fetch_profile)
argument :preferences, result(:fetch_preferences)
argument :activity, result(:fetch_activity)
run fn args, _context ->
user_data = %{
profile: args.profile,
preferences: args.preferences,
activity: args.activity,
summary: "User #{args.profile.id} has #{args.activity.login_count} logins"
}
{:ok, user_data}
end
end
return :aggregate_data
end
Step 5: Test the concurrent execution
Let's test our reactor to see the difference between concurrent and sequential execution:
iex -S mix
# Test the concurrent execution
start_time = :erlang.monotonic_time(:millisecond)
{:ok, result} = Reactor.run(AsyncUserDataReactor, %{user_id: 123})
end_time = :erlang.monotonic_time(:millisecond)
duration = end_time - start_time
IO.puts("Completed in #{duration}ms")
IO.inspect(result.summary)
Step 6: Understanding the execution flow
The reactor completes in about 300ms instead of 650ms (200+150+300) because:
- All three fetch steps run concurrently (they only depend on input)
:aggregate_data
waits for all three to complete- Total time is limited by the slowest operation (300ms) not the sum
Compare with synchronous execution:
# Force all steps to run synchronously
{:ok, result} = Reactor.run(AsyncUserDataReactor, %{user_id: 123}, %{}, async?: false)
This takes the full 650ms because each step runs sequentially.
Step 7: Controlling async behavior
You can control which steps run synchronously when needed:
defmodule SyncVsAsyncReactor do
use Reactor
input :user_id
# I/O operations - keep async (default)
step :fetch_profile do
argument :user_id, input(:user_id)
# async? true # This is the default
run fn %{user_id: user_id}, _context ->
DataSources.fetch_user_profile(user_id)
end
end
# CPU-intensive work - tune reactor concurrency as needed
step :process_data do
argument :profile, result(:fetch_profile)
# async? true is the default - adjust reactor max_concurrency instead
run fn %{profile: profile}, _context ->
Process.sleep(100)
{:ok, Map.put(profile, :processed, true)}
end
end
return :process_data
end
What you learned
You now understand Reactor's concurrency model:
- Steps run async by default - Enables automatic parallelisation
- Dependencies determine execution order - Independent steps run concurrently
- Tune concurrency for your workload - Adjust limits based on system capacity
- Performance optimisation - Balance concurrency with system resources
- Concurrency control - Manage resource usage with limits
Performance guidelines:
- I/O operations → Generally benefit from high concurrency
- CPU-intensive work → Tune concurrency to match CPU cores and workload
- Resource limits → Set concurrency limits based on system capacity
What's next
Now that you understand concurrency, you're ready for advanced workflow patterns:
- Composition - Build complex workflows with sub-reactors
- Recursive Execution - Advanced iterative patterns
- Testing Strategies - Test concurrent workflows effectively
Common issues
Steps aren't running in parallel: Check for hidden dependencies in arguments - each argument creates a dependency
For comprehensive performance and concurrency troubleshooting, see Performance Optimization.
Happy building concurrent workflows! ⚡