Self-tuning scheduler that adjusts dispatch strategy based on runtime profiling.
Tracks per-node execution statistics via on_complete/3 callbacks and classifies
nodes into capabilities for optimal dispatch grouping. During a warmup period
(insufficient samples), falls back to structural analysis only — equivalent to
ChainBatching / FlowBatch behavior.
Capabilities
The scheduler maintains a registry of known dispatch capabilities, evaluated in priority order (lowest priority number first):
:fast— average duration belowfast_threshold_ms. Dispatched individually to avoid batching overhead. Nodes classified as fast are ideal candidates for:inlineexecutor viaSchedulerPolicy.:unreliable— error rate aboveerror_rate_threshold. Dispatched individually to contain failure blast radius and prevent chain contamination.:batchable— default classification. Eligible for chain batching (sequential Promises) and parallel batching (parallel Promises via Flow).
Nodes with insufficient samples (below warmup_samples) are classified as
:batchable and participate in structural chain/parallel detection without
profiling-based overrides.
Profiling
Duration tracking uses an exponential moving average (EMA) to weight recent
observations more heavily than historical ones. The ema_alpha parameter
controls the smoothing factor — higher values make the average more responsive
to recent changes.
Error rates are computed as error_count / sample_count — a simple running ratio.
For Promise dispatch units, the total duration is distributed equally across all node hashes in the promise. This is a rough estimate that converges as nodes are also observed via individual dispatch.
Options
:fast_threshold_ms— duration threshold for:fastclassification (default:1.0):error_rate_threshold— error rate threshold for:unreliable(default:0.1):warmup_samples— minimum samples before profiling influences decisions (default:3):ema_alpha— EMA smoothing factor, 0..1; higher = more reactive (default:0.3):min_chain_length— minimum chain length for sequential Promises (default:2):min_batch_size— minimum batch size for parallel Promises (default:4):flow_stages— max Flow stages for parallel Promises (default:System.schedulers_online()):flow_max_demand— Flow max_demand per stage (default:1):capabilities— list of%Capability{}structs to override default classifications:classifier— custom(NodeProfile.t(), Runnable.t() -> atom())function that bypasses capability matching entirely
Example
# Adaptive scheduler with aggressive inline threshold
Runic.Runner.start_workflow(runner, :my_workflow, workflow,
scheduler: Runic.Runner.Scheduler.Adaptive,
scheduler_opts: [
fast_threshold_ms: 5.0,
warmup_samples: 5,
min_chain_length: 3
]
)
Summary
Functions
Returns the classification for a runnable given the current scheduler state.
Returns the current node profile for a given node hash, or nil if untracked.
Returns a summary of current profiling state for observability.
Functions
@spec classify_node(Runic.Workflow.Runnable.t(), map()) :: atom()
Returns the classification for a runnable given the current scheduler state.
During warmup (insufficient samples), returns :batchable.
After warmup, evaluates registered capabilities in priority order.
@spec get_profile(map(), term()) :: Runic.Runner.Scheduler.Adaptive.NodeProfile.t() | nil
Returns the current node profile for a given node hash, or nil if untracked.
@spec profile_summary(map()) :: %{ total_tracked: non_neg_integer(), classifications: %{required(atom()) => non_neg_integer()} }
Returns a summary of current profiling state for observability.
Useful for debugging and monitoring adaptive scheduler behavior.