Reactor Architecture
View SourceThis guide explains how Reactor works internally, providing insight into its components, execution flow, and design patterns. Understanding the architecture helps you make informed decisions about workflow design, performance optimization, and debugging.
Core Components
Architecture Overview
Reactor's architecture follows a clear separation of concerns between planning, execution, and state management:
graph TB
    subgraph "Reactor Framework"
        DSL[DSL Definition] --> Parser[DSL Parser]
        Parser --> Builder[Reactor Builder]
        Builder --> Planner[Execution Planner]
        Planner --> Executor[Concurrent Executor]
        
        subgraph "Core Components"
            Steps[Step Definitions]
            Args[Argument Dependencies]
            Guards[Guard Conditions]
            Middleware[Middleware Stack]
        end
        
        Steps --> Builder
        Args --> Builder
        Guards --> Builder
        Middleware --> Executor
        
        Executor --> Results[Execution Results]
    end
    
    subgraph "External World"
        Inputs[Input Data] --> DSL
        Results --> Outputs[Output Data]
        
        APIs[External APIs]
        DBs[Databases]
        Files[File Systems]
        
        Executor <--> APIs
        Executor <--> DBs
        Executor <--> Files
    end
    
    style DSL fill:#e1f5fe
    style Executor fill:#fff3e0
    style Results fill:#c8e6c9The Reactor Struct
At the heart of Reactor is the Reactor struct, which contains all the information needed for workflow execution:
defstruct context: %{},
          id: nil,
          input_descriptions: %{},
          inputs: [],
          intermediate_results: %{},
          middleware: [],
          plan: nil,
          return: nil,
          state: :pending,
          steps: [],
          undo: []Key Components:
- Context: Runtime context shared across all steps
- State: Reactor lifecycle (:pending,:executing,:halted,:failed,:successful)
- Intermediate Results: Step outputs stored for dependency resolution
- Undo Stack: Successful undoable steps tracked for rollback
- Plan: The dependency graph once constructed
DSL Implementation
Reactor's DSL is built with Spark, providing a declarative way to define workflows:
DSL Processing Flow
- Entity Collection: DSL entities (steps, inputs, compose) are parsed during compilation
- Transformation: The DSL transformer validates and processes entities:- Ensures step names are unique
- Sets default return value to the last step
- Plans the reactor at compile-time
 
- Code Generation: Creates a reactor/0function returning the planned struct
- Runtime Conversion: Reactor.Info.to_struct/1builds the executable reactor
DSL Architecture
DSL Definition → Spark Entities → Transformer → Validation → Runtime StructThe transformer ensures reactors are validated and optimised at compile-time, catching errors early and enabling efficient execution.
Executor Components
The executor system coordinates all aspects of reactor execution through several specialised components:
Main Executor (Reactor.Executor)
Implements the core execution algorithm with sophisticated async/sync coordination:
Execution Loop Priority:
- Process completed async tasks first (maximum concurrency)
- Start new async steps (within concurrency limits)
- Run one sync step if available
- Handle deadlock prevention for shared pools
- Check completion conditions
Key Features:
- Concurrency Management: Respects max_concurrencyand shared pools
- Deadlock Prevention: Runs async steps synchronously when pools exhausted
- Timeout Handling: Respects execution timeouts
- Error Orchestration: Coordinates undo operations on failure
Step Runner (Reactor.Executor.StepRunner)
Handles individual step execution with comprehensive error handling:
Execution Pipeline:
Argument Collection → Context Building → Guard Evaluation → Step Run → Result ProcessingAdvanced Features:
- Subpath Resolution: Extracts nested values from step results using result(:step, [:key, :subkey])
- Process Metadata: Maintains execution context in process dictionary for middleware
- Retry Management: Tracks retry counts and respects step limits
- Compensation Handling: Attempts error recovery when steps support it
State Management (Reactor.Executor.State)
Runtime-only execution state separate from reactor definition:
%State{
  async?: true,
  concurrency_key: nil,
  current_tasks: %{},     # task -> step mapping
  errors: [],             # accumulated errors
  max_concurrency: nil,   # concurrency limit
  retries: %{},          # per-step retry counts
  skipped: MapSet.new()   # steps skipped by guards
}This separation allows the reactor definition to remain immutable while execution state is managed separately.
Concurrency Tracker (Reactor.Executor.ConcurrencyTracker)
ETS-based global concurrency pool manager enabling resource sharing:
Architecture:
- Pool Allocation: Creates concurrency pools with configurable limits
- Atomic Operations: Uses ETS select_replacefor thread-safe resource tracking
- Process Monitoring: Automatically cleans up pools when allocator processes die
- Deadlock Prevention: Enables nested reactor execution without resource exhaustion
Pool Sharing Benefits:
- Multiple reactors can share concurrency limits
- Prevents resource exhaustion in nested scenarios
- Enables efficient resource utilisation across reactor hierarchy
Planner (Reactor.Planner)
Converts step definitions into an executable dependency graph:
Graph Construction Process:
- Vertex Creation: Each step becomes a graph vertex
- Dependency Analysis: Examines step arguments to identify dependencies
- Edge Creation: Adds directed edges from dependency steps to dependent steps
- Cycle Detection: Validates the graph is acyclic
- Ready State: Identifies steps with no dependencies (ready to run)
Dependency Types:
- input(:name)- No dependencies (ready immediately)
- result(:step)- Depends on named step
- value(x)- No dependencies (static values)
The planner uses the libgraph library for efficient DAG operations and ensures all dependencies exist before execution begins.
Execution Flow
Initialization Phase
Setup Sequence:
Inputs + Context + Options → State Creation → Pool Allocation → Hook Initialization- State Creation: Reactor.Executor.Initbuilds execution state from options
- Pool Management: Allocates or reuses concurrency pools based on concurrency_key
- Context Merging: Combines user context with reactor context
- Middleware Setup: Initialises hooks for telemetry and other cross-cutting concerns
Main Execution Loop
The executor follows a sophisticated priority-based algorithm designed for maximum concurrency:
# Simplified execution loop structure
def execute_loop(reactor, state) do
  state
  |> handle_completed_async_tasks()
  |> start_new_async_tasks()
  |> run_one_sync_step_if_available()
  |> handle_deadlock_prevention()
  |> check_completion_conditions()
  |> continue_or_finish()
endExecution Priorities:
- Async Task Completion: Highest priority for maximum throughput
- New Async Tasks: Started when concurrency slots available
- Sync Tasks: Fill gaps when async slots unavailable
- Deadlock Prevention: Forces async tasks to run sync in nested scenarios
Step Execution Mechanics
Each step follows a detailed execution pipeline:
Argument Resolution
# Argument types and their resolution
input(:name)                    # → reactor.context.private.inputs
result(:step)                   # → reactor.intermediate_results
result(:step, :key)             # → get_in(intermediate_results, [:step, :key])
result(:step, [:key, :subkey])  # → get_in(intermediate_results, [:step, :key, :subkey])
value(literal)                  # → literal valueResolution Process:
- Validates all required arguments are available
- Handles nested data extraction for subpaths
- Applies transformations (extracted as separate steps during planning)
- Builds argument map for step invocation
Step Invocation
# Step execution with comprehensive error handling
case step.run(arguments, context, step) do
  {:ok, result} -> handle_success(result)
  {:ok, result, new_steps} -> handle_success_with_new_steps(result, new_steps)
  {:error, reason} -> attempt_compensation(reason)
  :retry -> schedule_retry()
  {:halt, reason} -> initiate_graceful_shutdown(reason)
endContext Building: Each step receives enhanced context:
- Reactor Context: Shared data and configuration
- Step Context: Current step, retry count, concurrency key
- Process Context: Middleware-managed async execution context
Result Processing
Success Path
Result Storage Strategy:
- Results stored in intermediate_resultsonly when needed by:- Dependent steps (determined by dependency graph)
- Reactor return value
 
- Memory optimisation: unused results are discarded immediately
Undo Stack Management:
- Steps with undo/4callbacks are added to the undo stack with their results
- Stack maintained in reverse chronological order (LIFO)
- Non-undoable steps save memory by not storing results
Graph Updates:
- Completed steps are removed from the dependency graph
- Dependent steps become ready when all dependencies satisfied
- New steps (from {:ok, result, new_steps}) are planned and integrated
Error Path
Error Handling Cascade:
- Compensation Attempt: Call step's compensate/4if available
- Undo Initiation: If compensation fails, begin undo cascade
- Stack Processing: Undo steps in reverse order
- Error Collection: Accumulate all errors for final reporting
- Resource Cleanup: Release concurrency slots and clean up tasks
Compensation Options:
def compensate(reason, arguments, context, step) do
  case reason do
    %RetryableError{} -> :retry              # Try again
    %FallbackError{value: v} -> {:continue, v} # Use fallback
    _other -> :ok                            # Accept failure
  end
endState Management
Context Evolution
Multi-layered Context System:
# Base reactor context
%{
  private: %{
    inputs: %{user_id: 123},           # User inputs
    composed_reactors: #{},            # Composition tracking
    replace_arguments: nil             # Argument transformation
  }
}
# Enhanced step context (merged at runtime)
%{
  current_step: step,
  current_try: 0,
  retries_remaining: 5,
  concurrency_key: #Reference<...>
}Context Sharing:
- Reactor Level: Shared across all steps in the reactor
- Step Level: Step-specific context merged with reactor context
- Process Level: Middleware-managed context for async execution
Memory Management
Growth Patterns:
- Constant: Step definitions, dependency graph structure
- Linear: Intermediate results, undo stack
- Temporary: Task mappings, retry counters
Optimisation Strategies:
- Results stored only when required by dependencies or return value
- Non-undoable steps excluded from undo stack
- Completed steps removed from graph for garbage collection
- Task mappings cleaned immediately upon completion
Concurrency State
Resource Tracking:
# Active task management
current_tasks: %{
  #PID<0.123.0> => %Step{name: :fetch_user},
  #PID<0.124.0> => %Step{name: :process_data}
}
# Concurrency pool state (in ETS)
{pool_key, current_count, max_count}Thread Safety:
- All state mutations happen in main executor process
- Concurrency pool uses atomic ETS operations
- Task results collected via message passing
- No shared mutable state between async tasks
Performance Characteristics
Concurrency Model
Bottleneck Analysis:
- Pool Exhaustion: Limited by max_concurrencysetting
- Dependency Chains: Sequential dependencies limit parallelisation
- Resource Contention: Shared pools across reactor hierarchy
Optimisation Features:
- Automatic Sizing: Default pool size based on System.schedulers_online/0
- Pool Sharing: Multiple reactors share resources efficiently
- Eager Execution: Tasks start immediately when dependencies satisfied
- Priority Scheduling: Async completion processed first
Memory Efficiency
Access Patterns:
- Intermediate Results: O(1) lookup by step name
- Undo Stack: O(n) traversal during error handling only
- Task Management: O(1) task-to-step mapping
- Dependency Resolution: O(1) argument lookup
Scalability Characteristics:
- Linear Growth: Memory scales with active results and undo stack size
- Bounded Resources: Concurrency limits prevent resource exhaustion
- Efficient Cleanup: Completed steps removed immediately
- Network Effects: Shared pools improve overall system utilisation
Graph Operations
Complexity Analysis:
- Construction: O(V + E) where V = steps, E = dependencies
- Planning: O(V + E) cycle detection using efficient graph algorithms
- Execution: O(1) ready step identification per iteration
- Updates: O(1) step completion, O(k) for k new dependencies
Performance Optimisations:
- Compile-time planning for DSL-defined reactors
- Incremental planning for dynamically added steps
- Efficient graph library with optimised operations
- Vertex removal enables immediate garbage collection
Integration Patterns
Middleware Architecture
Hook Points:
- Reactor Level: before_execute,after_execute,on_error
- Step Level: before_step,after_step,on_step_error
- Context Management: Automatic context propagation to async tasks
Implementation:
reactor "instrumented_workflow" do
  middlewares do
    middleware Reactor.Middleware.Telemetry
  end
endComposition Model
Sub-Reactor Integration:
- Sub-reactors run with shared concurrency pools
- Context propagated from parent to child
- Error handling respects parent reactor's compensation strategy
- Results flow back to parent reactor seamlessly
Dynamic Workflow Capabilities
Runtime Step Creation:
def run(arguments, context, step) do
  new_steps = build_steps_based_on_data(arguments.data)
  {:ok, result, new_steps}
endIntegration Process:
- New steps planned and validated
- Dependencies integrated into existing graph
- Ready steps scheduled immediately
- Execution continues seamlessly
This architecture enables Reactor to efficiently orchestrate complex, concurrent workflows while maintaining transactional semantics and providing robust error handling capabilities. The design prioritises both performance and reliability, making it suitable for mission-critical applications requiring sophisticated workflow orchestration.