Reactor Architecture
View SourceThis guide explains how Reactor works internally, providing insight into its components, execution flow, and design patterns. Understanding the architecture helps you make informed decisions about workflow design, performance optimization, and debugging.
Core Components
Architecture Overview
Reactor's architecture follows a clear separation of concerns between planning, execution, and state management:
graph TB
subgraph "Reactor Framework"
DSL[DSL Definition] --> Parser[DSL Parser]
Parser --> Builder[Reactor Builder]
Builder --> Planner[Execution Planner]
Planner --> Executor[Concurrent Executor]
subgraph "Core Components"
Steps[Step Definitions]
Args[Argument Dependencies]
Guards[Guard Conditions]
Middleware[Middleware Stack]
end
Steps --> Builder
Args --> Builder
Guards --> Builder
Middleware --> Executor
Executor --> Results[Execution Results]
end
subgraph "External World"
Inputs[Input Data] --> DSL
Results --> Outputs[Output Data]
APIs[External APIs]
DBs[Databases]
Files[File Systems]
Executor <--> APIs
Executor <--> DBs
Executor <--> Files
end
style DSL fill:#e1f5fe
style Executor fill:#fff3e0
style Results fill:#c8e6c9
The Reactor Struct
At the heart of Reactor is the Reactor
struct, which contains all the information needed for workflow execution:
defstruct context: %{},
id: nil,
input_descriptions: %{},
inputs: [],
intermediate_results: %{},
middleware: [],
plan: nil,
return: nil,
state: :pending,
steps: [],
undo: []
Key Components:
- Context: Runtime context shared across all steps
- State: Reactor lifecycle (
:pending
,:executing
,:halted
,:failed
,:successful
) - Intermediate Results: Step outputs stored for dependency resolution
- Undo Stack: Successful undoable steps tracked for rollback
- Plan: The dependency graph once constructed
DSL Implementation
Reactor's DSL is built with Spark, providing a declarative way to define workflows:
DSL Processing Flow
- Entity Collection: DSL entities (steps, inputs, compose) are parsed during compilation
- Transformation: The DSL transformer validates and processes entities:
- Ensures step names are unique
- Sets default return value to the last step
- Plans the reactor at compile-time
- Code Generation: Creates a
reactor/0
function returning the planned struct - Runtime Conversion:
Reactor.Info.to_struct/1
builds the executable reactor
DSL Architecture
DSL Definition → Spark Entities → Transformer → Validation → Runtime Struct
The transformer ensures reactors are validated and optimised at compile-time, catching errors early and enabling efficient execution.
Executor Components
The executor system coordinates all aspects of reactor execution through several specialised components:
Main Executor (Reactor.Executor
)
Implements the core execution algorithm with sophisticated async/sync coordination:
Execution Loop Priority:
- Process completed async tasks first (maximum concurrency)
- Start new async steps (within concurrency limits)
- Run one sync step if available
- Handle deadlock prevention for shared pools
- Check completion conditions
Key Features:
- Concurrency Management: Respects
max_concurrency
and shared pools - Deadlock Prevention: Runs async steps synchronously when pools exhausted
- Timeout Handling: Respects execution timeouts
- Error Orchestration: Coordinates undo operations on failure
Step Runner (Reactor.Executor.StepRunner
)
Handles individual step execution with comprehensive error handling:
Execution Pipeline:
Argument Collection → Context Building → Guard Evaluation → Step Run → Result Processing
Advanced Features:
- Subpath Resolution: Extracts nested values from step results using
result(:step, [:key, :subkey])
- Process Metadata: Maintains execution context in process dictionary for middleware
- Retry Management: Tracks retry counts and respects step limits
- Compensation Handling: Attempts error recovery when steps support it
State Management (Reactor.Executor.State
)
Runtime-only execution state separate from reactor definition:
%State{
async?: true,
concurrency_key: nil,
current_tasks: %{}, # task -> step mapping
errors: [], # accumulated errors
max_concurrency: nil, # concurrency limit
retries: %{}, # per-step retry counts
skipped: MapSet.new() # steps skipped by guards
}
This separation allows the reactor definition to remain immutable while execution state is managed separately.
Concurrency Tracker (Reactor.Executor.ConcurrencyTracker
)
ETS-based global concurrency pool manager enabling resource sharing:
Architecture:
- Pool Allocation: Creates concurrency pools with configurable limits
- Atomic Operations: Uses ETS
select_replace
for thread-safe resource tracking - Process Monitoring: Automatically cleans up pools when allocator processes die
- Deadlock Prevention: Enables nested reactor execution without resource exhaustion
Pool Sharing Benefits:
- Multiple reactors can share concurrency limits
- Prevents resource exhaustion in nested scenarios
- Enables efficient resource utilisation across reactor hierarchy
Planner (Reactor.Planner
)
Converts step definitions into an executable dependency graph:
Graph Construction Process:
- Vertex Creation: Each step becomes a graph vertex
- Dependency Analysis: Examines step arguments to identify dependencies
- Edge Creation: Adds directed edges from dependency steps to dependent steps
- Cycle Detection: Validates the graph is acyclic
- Ready State: Identifies steps with no dependencies (ready to run)
Dependency Types:
input(:name)
- No dependencies (ready immediately)result(:step)
- Depends on named stepvalue(x)
- No dependencies (static values)
The planner uses the libgraph
library for efficient DAG operations and ensures all dependencies exist before execution begins.
Execution Flow
Initialization Phase
Setup Sequence:
Inputs + Context + Options → State Creation → Pool Allocation → Hook Initialization
- State Creation:
Reactor.Executor.Init
builds execution state from options - Pool Management: Allocates or reuses concurrency pools based on
concurrency_key
- Context Merging: Combines user context with reactor context
- Middleware Setup: Initialises hooks for telemetry and other cross-cutting concerns
Main Execution Loop
The executor follows a sophisticated priority-based algorithm designed for maximum concurrency:
# Simplified execution loop structure
def execute_loop(reactor, state) do
state
|> handle_completed_async_tasks()
|> start_new_async_tasks()
|> run_one_sync_step_if_available()
|> handle_deadlock_prevention()
|> check_completion_conditions()
|> continue_or_finish()
end
Execution Priorities:
- Async Task Completion: Highest priority for maximum throughput
- New Async Tasks: Started when concurrency slots available
- Sync Tasks: Fill gaps when async slots unavailable
- Deadlock Prevention: Forces async tasks to run sync in nested scenarios
Step Execution Mechanics
Each step follows a detailed execution pipeline:
Argument Resolution
# Argument types and their resolution
input(:name) # → reactor.context.private.inputs
result(:step) # → reactor.intermediate_results
result(:step, :key) # → get_in(intermediate_results, [:step, :key])
result(:step, [:key, :subkey]) # → get_in(intermediate_results, [:step, :key, :subkey])
value(literal) # → literal value
Resolution Process:
- Validates all required arguments are available
- Handles nested data extraction for subpaths
- Applies transformations (extracted as separate steps during planning)
- Builds argument map for step invocation
Step Invocation
# Step execution with comprehensive error handling
case step.run(arguments, context, step) do
{:ok, result} -> handle_success(result)
{:ok, result, new_steps} -> handle_success_with_new_steps(result, new_steps)
{:error, reason} -> attempt_compensation(reason)
:retry -> schedule_retry()
{:halt, reason} -> initiate_graceful_shutdown(reason)
end
Context Building: Each step receives enhanced context:
- Reactor Context: Shared data and configuration
- Step Context: Current step, retry count, concurrency key
- Process Context: Middleware-managed async execution context
Result Processing
Success Path
Result Storage Strategy:
- Results stored in
intermediate_results
only when needed by:- Dependent steps (determined by dependency graph)
- Reactor return value
- Memory optimisation: unused results are discarded immediately
Undo Stack Management:
- Steps with
undo/4
callbacks are added to the undo stack with their results - Stack maintained in reverse chronological order (LIFO)
- Non-undoable steps save memory by not storing results
Graph Updates:
- Completed steps are removed from the dependency graph
- Dependent steps become ready when all dependencies satisfied
- New steps (from
{:ok, result, new_steps}
) are planned and integrated
Error Path
Error Handling Cascade:
- Compensation Attempt: Call step's
compensate/4
if available - Undo Initiation: If compensation fails, begin undo cascade
- Stack Processing: Undo steps in reverse order
- Error Collection: Accumulate all errors for final reporting
- Resource Cleanup: Release concurrency slots and clean up tasks
Compensation Options:
def compensate(reason, arguments, context, step) do
case reason do
%RetryableError{} -> :retry # Try again
%FallbackError{value: v} -> {:continue, v} # Use fallback
_other -> :ok # Accept failure
end
end
State Management
Context Evolution
Multi-layered Context System:
# Base reactor context
%{
private: %{
inputs: %{user_id: 123}, # User inputs
composed_reactors: #{}, # Composition tracking
replace_arguments: nil # Argument transformation
}
}
# Enhanced step context (merged at runtime)
%{
current_step: step,
current_try: 0,
retries_remaining: 5,
concurrency_key: #Reference<...>
}
Context Sharing:
- Reactor Level: Shared across all steps in the reactor
- Step Level: Step-specific context merged with reactor context
- Process Level: Middleware-managed context for async execution
Memory Management
Growth Patterns:
- Constant: Step definitions, dependency graph structure
- Linear: Intermediate results, undo stack
- Temporary: Task mappings, retry counters
Optimisation Strategies:
- Results stored only when required by dependencies or return value
- Non-undoable steps excluded from undo stack
- Completed steps removed from graph for garbage collection
- Task mappings cleaned immediately upon completion
Concurrency State
Resource Tracking:
# Active task management
current_tasks: %{
#PID<0.123.0> => %Step{name: :fetch_user},
#PID<0.124.0> => %Step{name: :process_data}
}
# Concurrency pool state (in ETS)
{pool_key, current_count, max_count}
Thread Safety:
- All state mutations happen in main executor process
- Concurrency pool uses atomic ETS operations
- Task results collected via message passing
- No shared mutable state between async tasks
Performance Characteristics
Concurrency Model
Bottleneck Analysis:
- Pool Exhaustion: Limited by
max_concurrency
setting - Dependency Chains: Sequential dependencies limit parallelisation
- Resource Contention: Shared pools across reactor hierarchy
Optimisation Features:
- Automatic Sizing: Default pool size based on
System.schedulers_online/0
- Pool Sharing: Multiple reactors share resources efficiently
- Eager Execution: Tasks start immediately when dependencies satisfied
- Priority Scheduling: Async completion processed first
Memory Efficiency
Access Patterns:
- Intermediate Results: O(1) lookup by step name
- Undo Stack: O(n) traversal during error handling only
- Task Management: O(1) task-to-step mapping
- Dependency Resolution: O(1) argument lookup
Scalability Characteristics:
- Linear Growth: Memory scales with active results and undo stack size
- Bounded Resources: Concurrency limits prevent resource exhaustion
- Efficient Cleanup: Completed steps removed immediately
- Network Effects: Shared pools improve overall system utilisation
Graph Operations
Complexity Analysis:
- Construction: O(V + E) where V = steps, E = dependencies
- Planning: O(V + E) cycle detection using efficient graph algorithms
- Execution: O(1) ready step identification per iteration
- Updates: O(1) step completion, O(k) for k new dependencies
Performance Optimisations:
- Compile-time planning for DSL-defined reactors
- Incremental planning for dynamically added steps
- Efficient graph library with optimised operations
- Vertex removal enables immediate garbage collection
Integration Patterns
Middleware Architecture
Hook Points:
- Reactor Level:
before_execute
,after_execute
,on_error
- Step Level:
before_step
,after_step
,on_step_error
- Context Management: Automatic context propagation to async tasks
Implementation:
reactor "instrumented_workflow" do
middlewares do
middleware Reactor.Middleware.Telemetry
end
end
Composition Model
Sub-Reactor Integration:
- Sub-reactors run with shared concurrency pools
- Context propagated from parent to child
- Error handling respects parent reactor's compensation strategy
- Results flow back to parent reactor seamlessly
Dynamic Workflow Capabilities
Runtime Step Creation:
def run(arguments, context, step) do
new_steps = build_steps_based_on_data(arguments.data)
{:ok, result, new_steps}
end
Integration Process:
- New steps planned and validated
- Dependencies integrated into existing graph
- Ready steps scheduled immediately
- Execution continues seamlessly
This architecture enables Reactor to efficiently orchestrate complex, concurrent workflows while maintaining transactional semantics and providing robust error handling capabilities. The design prioritises both performance and reliability, making it suitable for mission-critical applications requiring sophisticated workflow orchestration.