Building Async Workflows

View Source

In this tutorial, you'll learn how to build efficient concurrent workflows that take advantage of Reactor's dependency resolution and async execution capabilities.

What you'll build

A data processing pipeline that:

  1. Fetches user data from multiple sources concurrently
  2. Processes different data types in parallel
  3. Aggregates results efficiently
  4. Handles mixed sync/async requirements
  5. Optimizes performance with proper concurrency control

You'll learn

  • How Reactor's concurrency model works
  • When to use async vs sync execution
  • How to optimize workflow performance
  • Managing dependencies for maximum parallelisation
  • Controlling concurrency limits and resource usage

Prerequisites

Step 1: Set up the project

If you don't have a project from the previous tutorials:

mix igniter.new reactor_tutorial --install reactor
cd reactor_tutorial

Step 2: Understanding Reactor's concurrency model

Reactor runs steps asynchronously by default:

  • Independent steps run in parallel - Steps with no dependencies execute simultaneously
  • Dependencies control execution order - Steps wait for their dependencies to complete
  • Automatic task management - Reactor manages Elixir tasks and supervision
  • Configurable concurrency - Control how many steps run at once

Async vs Sync execution

# Async (default) - runs in a separate task
step :fetch_data do
  async? true  # This is the default
  run &fetch_from_api/1
end

# Sync - runs in the main process
step :critical_operation do
  async? false  # Forces synchronous execution
  run &update_database/1
end

Step 3: Create simple data operations

Let's create some data operations that will run concurrently. Create lib/data_sources.ex:

defmodule DataSources do
  def fetch_user_profile(user_id) do
    Process.sleep(200)
    
    {:ok, %{
      id: user_id,
      name: "User #{user_id}",
      email: "user#{user_id}@example.com"
    }}
  end

  def fetch_user_preferences(user_id) do
    Process.sleep(150)
    
    {:ok, %{
      user_id: user_id,
      theme: "light",
      language: "en"
    }}
  end

  def fetch_user_activity(user_id) do
    Process.sleep(300)
    
    {:ok, %{
      user_id: user_id,
      last_login: DateTime.utc_now(),
      login_count: 42
    }}
  end
end

Step 4: Build a concurrent data reactor

Now let's build a reactor that fetches data concurrently. Create lib/async_user_data_reactor.ex:

defmodule AsyncUserDataReactor do
  use Reactor

  input :user_id

  # These steps have no dependencies on each other, so they run in parallel
  step :fetch_profile do
    argument :user_id, input(:user_id)
    run fn %{user_id: user_id}, _context ->
      DataSources.fetch_user_profile(user_id)
    end
  end

  step :fetch_preferences do
    argument :user_id, input(:user_id)  
    run fn %{user_id: user_id}, _context ->
      DataSources.fetch_user_preferences(user_id)
    end
  end

  step :fetch_activity do
    argument :user_id, input(:user_id)
    run fn %{user_id: user_id}, _context ->
      DataSources.fetch_user_activity(user_id)
    end
  end

  # This step waits for all the fetch steps to complete
  step :aggregate_data do
    argument :profile, result(:fetch_profile)
    argument :preferences, result(:fetch_preferences)
    argument :activity, result(:fetch_activity)
    
    run fn args, _context ->
      user_data = %{
        profile: args.profile,
        preferences: args.preferences,
        activity: args.activity,
        summary: "User #{args.profile.id} has #{args.activity.login_count} logins"
      }
      {:ok, user_data}
    end
  end

  return :aggregate_data
end

Step 5: Test the concurrent execution

Let's test our reactor to see the difference between concurrent and sequential execution:

iex -S mix
# Test the concurrent execution
start_time = :erlang.monotonic_time(:millisecond)

{:ok, result} = Reactor.run(AsyncUserDataReactor, %{user_id: 123})

end_time = :erlang.monotonic_time(:millisecond)
duration = end_time - start_time

IO.puts("Completed in #{duration}ms")
IO.inspect(result.summary)

Step 6: Understanding the execution flow

The reactor completes in about 300ms instead of 650ms (200+150+300) because:

  1. All three fetch steps run concurrently (they only depend on input)
  2. :aggregate_data waits for all three to complete
  3. Total time is limited by the slowest operation (300ms) not the sum

Compare with synchronous execution:

# Force all steps to run synchronously
{:ok, result} = Reactor.run(AsyncUserDataReactor, %{user_id: 123}, %{}, async?: false)

This takes the full 650ms because each step runs sequentially.

Step 7: Controlling async behavior

You can control which steps run synchronously when needed:

defmodule SyncVsAsyncReactor do
  use Reactor

  input :user_id

  # I/O operations - keep async (default)
  step :fetch_profile do
    argument :user_id, input(:user_id)
    # async? true  # This is the default
    run fn %{user_id: user_id}, _context ->
      DataSources.fetch_user_profile(user_id)
    end
  end

  # CPU-intensive work - tune reactor concurrency as needed
  step :process_data do
    argument :profile, result(:fetch_profile)
    # async? true is the default - adjust reactor max_concurrency instead
    
    run fn %{profile: profile}, _context ->
      Process.sleep(100)
      {:ok, Map.put(profile, :processed, true)}
    end
  end

  return :process_data
end

What you learned

You now understand Reactor's concurrency model:

  • Steps run async by default - Enables automatic parallelisation
  • Dependencies determine execution order - Independent steps run concurrently
  • Tune concurrency for your workload - Adjust limits based on system capacity
  • Performance optimisation - Balance concurrency with system resources
  • Concurrency control - Manage resource usage with limits

Performance guidelines:

  • I/O operations → Generally benefit from high concurrency
  • CPU-intensive work → Tune concurrency to match CPU cores and workload
  • Resource limits → Set concurrency limits based on system capacity

What's next

Now that you understand concurrency, you're ready for advanced workflow patterns:

Common issues

Steps aren't running in parallel: Check for hidden dependencies in arguments - each argument creates a dependency

For comprehensive performance and concurrency troubleshooting, see Performance Optimization.

Happy building concurrent workflows! ⚡