Reactor Cheatsheet

View Source

Reactor is a dynamic, concurrent, dependency resolving saga orchestrator for Elixir.

Basic Reactor Definition

Simple Reactor

defmodule MyReactor do
  use Reactor

  input :email
  input :password

  step :hash_password do
    argument :password, input(:password)
    run &Bcrypt.hash_pwd_salt/1
  end

  step :create_user, MyApp.CreateUser do
    argument :email, input(:email)
    argument :password_hash, result(:hash_password)
  end

  return :create_user
end

Running Reactors

# Basic execution
{:ok, result} = Reactor.run(MyReactor, 
  email: "user@example.com", 
  password: "secret"
)

# With context and options
{:ok, result} = Reactor.run(MyReactor, 
  inputs, 
  %{current_user: user},
  async?: false,
  max_concurrency: 10
)

# Halting and resuming
{:halted, state} = Reactor.run(MyReactor, inputs)
{:ok, result} = Reactor.run(state, %{}, %{})

Step Types

Basic Steps

# Anonymous function
step :transform do
  argument :data, input(:raw_data)
  run fn %{data: data}, _context ->
    {:ok, String.upcase(data)}
  end
end

# Module implementation
step :create_user, MyApp.Steps.CreateUser do
  argument :email, result(:validate_email)
  argument :data, input(:user_data)
end

# Sync/async control
step :critical_operation do
  async? false
  run &important_work/1
end

Debug Steps

debug :log_user do
  argument :user, result(:create_user)
  argument :message, value("User created")
end

Map Steps

map :process_users do
  source input(:user_list)
  batch_size 10
  allow_async? true

  step :validate_user do
    argument :user, element(:process_users)
    run &validate_user/1
  end
end

Compose Steps

compose :sub_workflow, MyApp.SubReactor do
  argument :input_data, result(:prepare_data)
end

Advanced Step Types

Switch Steps

switch :handle_user_type do
  on result(:user)

  matches? &(&1.type == :premium) do
    step :setup_premium do
      argument :user, result(:user)
      run &setup_premium_features/1
    end
  end

  default do
    step :setup_basic do
      argument :user, result(:user)
      run &setup_basic_features/1
    end
  end
end

Group Steps

group :user_setup do
  before_all &MyApp.setup_database/3
  after_all &MyApp.cleanup_database/1

  step :create_profile do
    # Profile creation logic
  end

  step :send_welcome_email do
    # Email logic
  end
end

Around Steps

around :transaction, &MyApp.with_transaction/4 do
  step :create_user do
    # User creation in transaction
  end

  step :create_profile do
    # Profile creation in transaction
  end
end

Collect Steps

collect :user_summary do
  argument :user, result(:create_user)
  argument :profile, result(:create_profile)
  argument :settings, result(:create_settings)

  transform fn inputs ->
    %{
      user: inputs.user,
      profile: inputs.profile,
      settings: inputs.settings
    }
  end
end

Arguments and Dependencies

Argument Sources

# From input
argument :email, input(:email)

# From step result
argument :user, result(:create_user)

# Static value
argument :timeout, value(5000)

# Nested value extraction
argument :user_id, result(:create_user, [:id])

# Input with path
argument :year, input(:date, [:year])

Argument Transformations

# Inline transformation
argument :user_id, result(:create_user) do
  transform &(&1.id)
end

# Block form with source
argument :age do
  source input(:birth_year)
  transform fn year -> 
    Date.utc_today().year - year 
  end
end

Dependencies

# Wait for step without using data
wait_for :verify_email

# Wait for multiple steps
wait_for [:setup_user, :setup_profile]

Conditional Execution

Guards

step :read_file_via_cache do
  argument :path, input(:path)
  run &File.read(&1.path)
  
  guard fn %{path: path}, %{cache: cache} ->
    case Cache.get(cache, path) do
      {:ok, content} -> {:halt, {:ok, content}}
      _ -> :cont
    end
  end
end

Where Clauses

step :conditional_step do
  argument :user, result(:create_user)
  
  where fn %{user: user} ->
    user.active? and user.plan == :premium
  end
  
  run &process_premium_user/1
end

# Simple predicate
step :read_file do
  argument :path, input(:path)
  run &File.read(&1.path)
  where &File.exists?(&1.path)
end

Step Implementation

Step Module

defmodule MyApp.Steps.CreateUser do
  use Reactor.Step

  @impl true
  def run(arguments, context, options) do
    case create_user(arguments) do
      {:ok, user} -> {:ok, user}
      {:error, reason} -> {:error, reason}
    end
  end

  @impl true
  def compensate(reason, arguments, context, options) do
    case reason do
      %DBConnection.ConnectionError{} -> :retry
      _other -> :ok
    end
  end

  @impl true
  def undo(user, arguments, context, options) do
    case delete_user(user) do
      :ok -> :ok
      {:error, reason} -> {:error, reason}
    end
  end
end

Return Values

# Success
{:ok, value}

# Success with additional steps
{:ok, value, [additional_step]}

# Failure (triggers compensation)
{:error, reason}

# Retry step
:retry
{:retry, reason}

# Pause reactor
{:halt, reason}

Error Handling

Compensation

def compensate(reason, arguments, context, options) do
  case reason do
    # Retry on network errors
    %HTTPoison.Error{reason: :timeout} -> :retry
    %HTTPoison.Error{reason: :econnrefused} -> :retry
    
    # Continue with error for other failures
    _other -> :ok
  end
end

Undo Operations

def undo(resource, arguments, context, options) do
  case delete_resource(resource) do
    :ok -> :ok
    {:error, :not_found} -> :ok  # Already deleted
    {:error, reason} -> {:error, reason}
  end
end

Inputs and Transformations

Input Definition

# Basic input
input :name

# Input with transformation
input :age do
  transform &String.to_integer/1
end

# Input with description
input :email, description: "User's email address"

Template Steps

template :welcome_message do
  argument :user, result(:create_user)
  template """
  Welcome <%= @user.name %>! 🎉
  Your account is now active.
  """
end

Middleware

Adding Middleware

defmodule MyReactor do
  use Reactor

  middlewares do
    middleware MyApp.LoggingMiddleware
    middleware Reactor.Middleware.Telemetry
  end

  # Steps...
end

Custom Middleware

defmodule MyApp.LoggingMiddleware do
  use Reactor.Middleware

  def init(context) do
    Logger.info("Reactor starting")
    {:ok, context}
  end

  def complete(result, context) do
    Logger.info("Reactor completed")
    {:ok, result}
  end

  def error(errors, context) do
    Logger.error("Reactor failed: #{inspect(errors)}")
    :ok
  end

  def event({:run_start, args}, step, context) do
    Logger.debug("Step #{step.name} starting")
  end
end

Common Patterns

Data Pipeline

input :raw_data

step :validate do
  argument :data, input(:raw_data)
  run &validate_data/1
end

step :transform do
  argument :data, result(:validate)
  run &transform_data/1
end

step :store do
  argument :data, result(:transform)
  run &store_data/1
end

return :store

Parallel Processing

step :fetch_user do
  argument :id, input(:user_id)
  run &fetch_user/1
end

step :fetch_settings do
  argument :id, input(:user_id)
  run &fetch_settings/1
end

collect :user_with_settings do
  argument :user, result(:fetch_user)
  argument :settings, result(:fetch_settings)
end

Error Recovery

step :risky_operation do
  run &might_fail/1
  max_retries 3
  
  compensate fn reason, _, _, _ ->
    case reason do
      %NetworkError{} -> :retry
      %TimeoutError{} -> :retry
      _other -> :ok
    end
  end
end

Quick Reference

Functions

  • input/1 - Define reactor input
  • step/2 - Define step
  • argument/2 - Define step dependency
  • result/1 - Reference step result
  • value/1 - Static value
  • wait_for/1 - Dependency without data
  • return/1 - Set reactor return

Built-in Steps

  • step - Basic step
  • debug - Log information
  • map - Process collections
  • compose - Embed reactors
  • switch - Conditional logic
  • group - Shared setup/teardown
  • around - Wrap execution
  • collect - Gather values
  • template - EEx templates
  • flunk - Force failure

Control Flow

  • guard - Conditional execution
  • where - Simple conditional
  • async? - Control sync/async
  • max_retries - Retry limits
  • batch_size - Map processing