Reactor Cheatsheet
View SourceReactor is a dynamic, concurrent, dependency resolving saga orchestrator for Elixir.
Basic Reactor Definition
Simple Reactor
defmodule MyReactor do
use Reactor
input :email
input :password
step :hash_password do
argument :password, input(:password)
run &Bcrypt.hash_pwd_salt/1
end
step :create_user, MyApp.CreateUser do
argument :email, input(:email)
argument :password_hash, result(:hash_password)
end
return :create_user
end
Running Reactors
# Basic execution
{:ok, result} = Reactor.run(MyReactor,
email: "user@example.com",
password: "secret"
)
# With context and options
{:ok, result} = Reactor.run(MyReactor,
inputs,
%{current_user: user},
async?: false,
max_concurrency: 10
)
# Halting and resuming
{:halted, state} = Reactor.run(MyReactor, inputs)
{:ok, result} = Reactor.run(state, %{}, %{})
Step Types
Basic Steps
# Anonymous function
step :transform do
argument :data, input(:raw_data)
run fn %{data: data}, _context ->
{:ok, String.upcase(data)}
end
end
# Module implementation
step :create_user, MyApp.Steps.CreateUser do
argument :email, result(:validate_email)
argument :data, input(:user_data)
end
# Sync/async control
step :critical_operation do
async? false
run &important_work/1
end
Debug Steps
debug :log_user do
argument :user, result(:create_user)
argument :message, value("User created")
end
Map Steps
map :process_users do
source input(:user_list)
batch_size 10
allow_async? true
step :validate_user do
argument :user, element(:process_users)
run &validate_user/1
end
end
Compose Steps
compose :sub_workflow, MyApp.SubReactor do
argument :input_data, result(:prepare_data)
end
Advanced Step Types
Switch Steps
switch :handle_user_type do
on result(:user)
matches? &(&1.type == :premium) do
step :setup_premium do
argument :user, result(:user)
run &setup_premium_features/1
end
end
default do
step :setup_basic do
argument :user, result(:user)
run &setup_basic_features/1
end
end
end
Group Steps
group :user_setup do
before_all &MyApp.setup_database/3
after_all &MyApp.cleanup_database/1
step :create_profile do
# Profile creation logic
end
step :send_welcome_email do
# Email logic
end
end
Around Steps
around :transaction, &MyApp.with_transaction/4 do
step :create_user do
# User creation in transaction
end
step :create_profile do
# Profile creation in transaction
end
end
Collect Steps
collect :user_summary do
argument :user, result(:create_user)
argument :profile, result(:create_profile)
argument :settings, result(:create_settings)
transform fn inputs ->
%{
user: inputs.user,
profile: inputs.profile,
settings: inputs.settings
}
end
end
Arguments and Dependencies
Argument Sources
# From input
argument :email, input(:email)
# From step result
argument :user, result(:create_user)
# Static value
argument :timeout, value(5000)
# Nested value extraction
argument :user_id, result(:create_user, [:id])
# Input with path
argument :year, input(:date, [:year])
Argument Transformations
# Inline transformation
argument :user_id, result(:create_user) do
transform &(&1.id)
end
# Block form with source
argument :age do
source input(:birth_year)
transform fn year ->
Date.utc_today().year - year
end
end
Dependencies
# Wait for step without using data
wait_for :verify_email
# Wait for multiple steps
wait_for [:setup_user, :setup_profile]
Conditional Execution
Guards
step :read_file_via_cache do
argument :path, input(:path)
run &File.read(&1.path)
guard fn %{path: path}, %{cache: cache} ->
case Cache.get(cache, path) do
{:ok, content} -> {:halt, {:ok, content}}
_ -> :cont
end
end
end
Where Clauses
step :conditional_step do
argument :user, result(:create_user)
where fn %{user: user} ->
user.active? and user.plan == :premium
end
run &process_premium_user/1
end
# Simple predicate
step :read_file do
argument :path, input(:path)
run &File.read(&1.path)
where &File.exists?(&1.path)
end
Step Implementation
Step Module
defmodule MyApp.Steps.CreateUser do
use Reactor.Step
@impl true
def run(arguments, context, options) do
case create_user(arguments) do
{:ok, user} -> {:ok, user}
{:error, reason} -> {:error, reason}
end
end
@impl true
def compensate(reason, arguments, context, options) do
case reason do
%DBConnection.ConnectionError{} -> :retry
_other -> :ok
end
end
@impl true
def undo(user, arguments, context, options) do
case delete_user(user) do
:ok -> :ok
{:error, reason} -> {:error, reason}
end
end
end
Return Values
# Success
{:ok, value}
# Success with additional steps
{:ok, value, [additional_step]}
# Failure (triggers compensation)
{:error, reason}
# Retry step
:retry
{:retry, reason}
# Pause reactor
{:halt, reason}
Error Handling
Compensation
def compensate(reason, arguments, context, options) do
case reason do
# Retry on network errors
%HTTPoison.Error{reason: :timeout} -> :retry
%HTTPoison.Error{reason: :econnrefused} -> :retry
# Continue with error for other failures
_other -> :ok
end
end
Undo Operations
def undo(resource, arguments, context, options) do
case delete_resource(resource) do
:ok -> :ok
{:error, :not_found} -> :ok # Already deleted
{:error, reason} -> {:error, reason}
end
end
Inputs and Transformations
Input Definition
# Basic input
input :name
# Input with transformation
input :age do
transform &String.to_integer/1
end
# Input with description
input :email, description: "User's email address"
Template Steps
template :welcome_message do
argument :user, result(:create_user)
template """
Welcome <%= @user.name %>! 🎉
Your account is now active.
"""
end
Middleware
Adding Middleware
defmodule MyReactor do
use Reactor
middlewares do
middleware MyApp.LoggingMiddleware
middleware Reactor.Middleware.Telemetry
end
# Steps...
end
Custom Middleware
defmodule MyApp.LoggingMiddleware do
use Reactor.Middleware
def init(context) do
Logger.info("Reactor starting")
{:ok, context}
end
def complete(result, context) do
Logger.info("Reactor completed")
{:ok, result}
end
def error(errors, context) do
Logger.error("Reactor failed: #{inspect(errors)}")
:ok
end
def event({:run_start, args}, step, context) do
Logger.debug("Step #{step.name} starting")
end
end
Common Patterns
Data Pipeline
input :raw_data
step :validate do
argument :data, input(:raw_data)
run &validate_data/1
end
step :transform do
argument :data, result(:validate)
run &transform_data/1
end
step :store do
argument :data, result(:transform)
run &store_data/1
end
return :store
Parallel Processing
step :fetch_user do
argument :id, input(:user_id)
run &fetch_user/1
end
step :fetch_settings do
argument :id, input(:user_id)
run &fetch_settings/1
end
collect :user_with_settings do
argument :user, result(:fetch_user)
argument :settings, result(:fetch_settings)
end
Error Recovery
step :risky_operation do
run &might_fail/1
max_retries 3
compensate fn reason, _, _, _ ->
case reason do
%NetworkError{} -> :retry
%TimeoutError{} -> :retry
_other -> :ok
end
end
end
Quick Reference
Functions
input/1
- Define reactor inputstep/2
- Define stepargument/2
- Define step dependencyresult/1
- Reference step resultvalue/1
- Static valuewait_for/1
- Dependency without datareturn/1
- Set reactor return
Built-in Steps
step
- Basic stepdebug
- Log informationmap
- Process collectionscompose
- Embed reactorsswitch
- Conditional logicgroup
- Shared setup/teardownaround
- Wrap executioncollect
- Gather valuestemplate
- EEx templatesflunk
- Force failure
Control Flow
guard
- Conditional executionwhere
- Simple conditionalasync?
- Control sync/asyncmax_retries
- Retry limitsbatch_size
- Map processing