Runic.Runner.Scheduler.Adaptive (Runic v0.1.0-alpha.7)

Copy Markdown View Source

Self-tuning scheduler that adjusts dispatch strategy based on runtime profiling.

Tracks per-node execution statistics via on_complete/3 callbacks and classifies nodes into capabilities for optimal dispatch grouping. During a warmup period (insufficient samples), falls back to structural analysis only — equivalent to ChainBatching / FlowBatch behavior.

Capabilities

The scheduler maintains a registry of known dispatch capabilities, evaluated in priority order (lowest priority number first):

  • :fast — average duration below fast_threshold_ms. Dispatched individually to avoid batching overhead. Nodes classified as fast are ideal candidates for :inline executor via SchedulerPolicy.
  • :unreliable — error rate above error_rate_threshold. Dispatched individually to contain failure blast radius and prevent chain contamination.
  • :batchable — default classification. Eligible for chain batching (sequential Promises) and parallel batching (parallel Promises via Flow).

Nodes with insufficient samples (below warmup_samples) are classified as :batchable and participate in structural chain/parallel detection without profiling-based overrides.

Profiling

Duration tracking uses an exponential moving average (EMA) to weight recent observations more heavily than historical ones. The ema_alpha parameter controls the smoothing factor — higher values make the average more responsive to recent changes.

Error rates are computed as error_count / sample_count — a simple running ratio.

For Promise dispatch units, the total duration is distributed equally across all node hashes in the promise. This is a rough estimate that converges as nodes are also observed via individual dispatch.

Options

  • :fast_threshold_ms — duration threshold for :fast classification (default: 1.0)
  • :error_rate_threshold — error rate threshold for :unreliable (default: 0.1)
  • :warmup_samples — minimum samples before profiling influences decisions (default: 3)
  • :ema_alpha — EMA smoothing factor, 0..1; higher = more reactive (default: 0.3)
  • :min_chain_length — minimum chain length for sequential Promises (default: 2)
  • :min_batch_size — minimum batch size for parallel Promises (default: 4)
  • :flow_stages — max Flow stages for parallel Promises (default: System.schedulers_online())
  • :flow_max_demand — Flow max_demand per stage (default: 1)
  • :capabilities — list of %Capability{} structs to override default classifications
  • :classifier — custom (NodeProfile.t(), Runnable.t() -> atom()) function that bypasses capability matching entirely

Example

# Adaptive scheduler with aggressive inline threshold
Runic.Runner.start_workflow(runner, :my_workflow, workflow,
  scheduler: Runic.Runner.Scheduler.Adaptive,
  scheduler_opts: [
    fast_threshold_ms: 5.0,
    warmup_samples: 5,
    min_chain_length: 3
  ]
)

Summary

Functions

Returns the classification for a runnable given the current scheduler state.

Returns the current node profile for a given node hash, or nil if untracked.

Returns a summary of current profiling state for observability.

Functions

classify_node(runnable, state)

@spec classify_node(Runic.Workflow.Runnable.t(), map()) :: atom()

Returns the classification for a runnable given the current scheduler state.

During warmup (insufficient samples), returns :batchable. After warmup, evaluates registered capabilities in priority order.

get_profile(state, node_hash)

@spec get_profile(map(), term()) ::
  Runic.Runner.Scheduler.Adaptive.NodeProfile.t() | nil

Returns the current node profile for a given node hash, or nil if untracked.

profile_summary(state)

@spec profile_summary(map()) :: %{
  total_tracked: non_neg_integer(),
  classifications: %{required(atom()) => non_neg_integer()}
}

Returns a summary of current profiling state for observability.

Useful for debugging and monitoring adaptive scheduler behavior.