Runic
Runic is a tool for modeling programs as data driven workflows that can be composed together at runtime.
Runic components connect together in a Runic.Workflow supporting lazily evaluated concurrent execution.
Runic Workflows are modeled as a decorated dataflow graph (a DAG - "directed acyclic graph") compiled from components such as steps, rules, pipelines, and state machines and more allowing coordinated interaction of disparate parts.
Installation
If available in Hex, the package can be installed
by adding runic to your list of dependencies in mix.exs:
def deps do
[
{:runic, "~> 0.1.0-alpha"}
]
endDocumentation can be generated with ExDoc and published on HexDocs. Once published, the docs can be found at https://hexdocs.pm/runic.
Concepts
Data flow dependencies between Lambda expressions, common in ETL pipelines, can be built with %Step{} components.
A Lambda Steps is a simple input -> output function.
require Runic
step = Runic.step(fn x -> x + 1 end)Steps are composable in a workflow:
workflow = Runic.workflow(
name: "example pipeline workflow",
steps: [
Runic.step(fn x -> x + 1 end), #A
Runic.step(fn x -> x * 2 end), #B
Runic.step(fn x -> x - 1 end) #C
]
)This produces a workflow graph where R is the entrypoint or "root" of the tree:
graph TD;
R-->A;
R-->B;
R-->C;In Runic, inputs flow through a workflow as a %Fact{}. During workflow evaluation various steps are traversed to and invoked producing more Facts.
alias Runic.Workflow
workflow
|> Workflow.react_until_satisfied(2)
|> Workflow.raw_productions()
[3, 4, 1]A core benefit Runic workflows are modeling pipelines that aren't just linear. For example:
defmodule TextProcessing do
def tokenize(text) do
text
|> String.downcase()
|> String.split(~R/[^[:alnum:]\-]/u, trim: true)
end
def count_words(list_of_words) do
list_of_words
|> Enum.reduce(Map.new(), fn word, map ->
Map.update(map, word, 1, &(&1 + 1))
end)
end
def count_uniques(word_count) do
Enum.count(word_count)
end
def first_word(list_of_words) do
List.first(list_of_words)
end
def last_word(list_of_words) do
List.last(list_of_words)
end
endNotice we have 3 functions that expect a list_of_words. In Elixir if we wanted to evaluate each output we can pipe them together the pipeline |> operator...
import TextProcessing
word_count =
"anybody want a peanut?"
|> tokenize()
|> count_words()
first_word =
"anybody want a peanut?"
|> tokenize()
|> first_word()
last_word =
"anybody want a peanut?"
|> tokenize()
|> last_word()However we're now evaluating linearly: using the common tokenize/1 function 3 times for the same input text.
This could be problematic if tokenize/1 is expensive - we'd prefer to run tokenize/1 just once and then fed into the rest of our pipeline.
With Runic we can compose all of these steps into one workflow and evaluate them together.
text_processing_workflow =
Runic.workflow(
name: "basic text processing example",
steps: [
{Runic.step(&tokenize/1),
[
{Runic.step(&count_words/1),
[
Runic.step(&count_uniques/1)
]},
Runic.step(&first_word/1),
Runic.step(&last_word/1)
]}
]
)Our text processing workflow graph now looks something like this:
graph TD;
R-->tokenize;
tokenize-->first_word;
tokenize-->last_word;
tokenize-->count_words;
count_words-->count_uniques;Now Runic can traverse over the graph of dataflow connections only evaluating tokenize/1 once for all three dependent steps.
alias Runic.Workflow
text_processing_workflow
|> Workflow.react_until_satisfied("anybody want a peanut?")
|> Workflow.raw_productions()
[
["anybody", "want", "a", "peanut"],
"anybody",
"peanut",
4,
%{"a" => 1, "anybody" => 1, "peanut" => 1, "want" => 1}
]Beyond steps, Runic has support for Rules, Joins, State Machines, FSMs, Aggregates, Sagas, and ProcessManagers for more complex control flow and stateful evaluation.
The Runic.Workflow.Invokable protocol is what allows for extension of Runic's runtime supporting nodes with different execution properties and evaluation.
The Runic.Component protocol supports extension of modeling new components that can be added and connected with other components in Runic workflows.
Runtime Workflow Composition
Workflows can be composed dynamically at runtime:
require Runic
alias Runic.Workflow
# Using Workflow.add/3 for dynamic composition
workflow = Runic.workflow()
|> Workflow.add(Runic.step(fn x -> x + 1 end, name: :add))
|> Workflow.add(Runic.step(fn x -> x * 2 end, name: :double), to: :add)
# Merge two workflows together
workflow1 = Runic.workflow(steps: [Runic.step(fn x -> x + 1 end)])
workflow2 = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
combined = Workflow.merge(workflow1, workflow2)
# Join multiple parent nodes
workflow = workflow
|> Workflow.add(Runic.step(fn a, b -> a + b end, name: :join), to: [:branch_a, :branch_b])See Runic.Workflow module documentation for adding components to workflows and running them.
Three-Phase Execution Model
Runic's Invokable protocol enforces a three-phase execution model designed for parallel execution and external scheduling:
- Prepare - Extract minimal context from the workflow into
%Runnable{}structs - Execute - Run runnables containing work functions and their inputs in isolation (can be parallelized)
- Apply - Reduce results back into the workflow so next steps can be determined
Parallel Execution
For workflows where nodes can execute concurrently:
alias Runic.Workflow
# Execute runnables in parallel with configurable concurrency
workflow
|> Workflow.react_until_satisfied(input, async: true, max_concurrency: 8) # Task.async_stream options
|> Workflow.raw_productions()External Scheduler Integration
For custom schedulers, worker pools, or distributed execution:
defmodule MyApp.WorkflowScheduler do
use GenServer
alias Runic.Workflow
alias Runic.Workflow.Invokable
# Phase 1: Prepare runnables for dispatch
def handle_cast({:run, input}, %{workflow: workflow} = state) do
workflow =
workflow
|> Workflow.plan_eagerly(input)
|> dispatch_tasks()
{:noreply, %{state | workflow: workflow}}
end
# Phase 2: Execute (dispatch to async worker pool, queue, external service, etc.)
defp dispatch_tasks(workflow) do
{workflow, runnables} = Workflow.prepare_for_dispatch(workflow)
Enum.map(runnables, fn runnable ->
Task.async(fn ->
# consider logging, error handling, retries here
Invokable.execute(runnable.node, runnable)
end)
end)
workflow
end
# Phase 3: Apply results back to workflow by handling async task callbacks with excecuted runnable
def handle_info({ref, executed_runnable}, %{workflow: workflow} = state) do
new_workflow =
Enum.reduce(executed, workflow, fn {:ok, runnable}, wrk ->
Workflow.apply_runnable(wrk, runnable)
end)
workflow =
if Workflow.is_runnable?(new_workflow) do
dispatch_tasks(workflow)
end
{:noreply, %{state | workflow: workflow}}
end
endKey APIs for external scheduling:
Workflow.prepare_for_dispatch/1- Returns{workflow, [%Runnable{}]}for dispatchWorkflow.apply_runnable/2- Applies a completed runnable back to the workflowInvokable.execute/2- Executes a runnable in isolation (no workflow access)
In summary, the Runic module provides high level functions and macros for building Runic Components
such as Steps, Rules, Workflows, and Accumulators.
The Runic.Workflow module is for connecting components together and running them with inputs.
Runic was designed to be used with custom process topologies and libraries such as GenStage, Broadway, and Flow without coupling you to one runtime model or a limited set of adapters.
Runic has first class support for dynamic runtime composition of workflows.
Runic is useful in problems where a developer cannot know upfront the logic or data flow in compiled code such as expert systems, user DSLs like Excel spreadsheets, low/no-code tools, or dynamic data pipelines.
If the runtime modification of a workflow or complex parallel dataflow evaluation isn't something your use case requires you might not need Runic.
Runic Workflows are essentially a dataflow based virtual machine running within Elixir and will not be faster than compiled Elixir code. If you know the flow of the program upfront during development you might not need Runic.
Runtime Context
Components can declare dependencies on external runtime values using context/1:
# Steps can reference external values
step = Runic.step(fn _x -> context(:api_key) end, name: :call_llm)
# Rules can use context in conditions
rule = Runic.rule name: :gated do
given(val: v)
where(v > context(:threshold))
then(fn %{val: v} -> {:ok, v} end)
end
# Accumulators, map, and reduce also support context/1
acc = Runic.accumulator(0, fn x, s -> s + x * context(:factor) end, name: :scaled)
map = Runic.map(fn x -> x * context(:multiplier) end, name: :mult_map)
# Provide defaults for optional context keys
step = Runic.step(fn _x -> context(:model, default: "gpt-4") end, name: :call_llm)
# Provide values at runtime
workflow
|> Workflow.put_run_context(%{
call_llm: %{api_key: "sk-..."},
_global: %{workspace_id: "ws1"}
})
|> Workflow.react_until_satisfied(input)Context values are scoped by component name, not part of the workflow hash, and not serialized — making them safe for secrets and connection handles. Keys with defaults are satisfied without explicit run_context entries. See Workflow.required_context_keys/1 and Workflow.validate_run_context/2 for introspection.
Scheduler Policies
Runic workflows support declarative per-node scheduling policies for retries, timeouts, backoff, fallbacks, and failure handling — without modifying the Invokable protocol or existing component structs.
Policies are stored on the workflow as a list of {matcher, policy_map} rules resolved at execution time. The first matching rule wins:
alias Runic.Workflow
alias Runic.Workflow.SchedulerPolicy
workflow =
workflow
|> Workflow.add_scheduler_policy(:call_llm, %{
max_retries: 3,
backoff: :exponential,
timeout_ms: 30_000
})
|> Workflow.append_scheduler_policy(:default, %{timeout_ms: 10_000})
# Policies are applied automatically during react/react_until_satisfied
workflow |> Workflow.react_until_satisfied(input)
# Runtime overrides can be passed as options (prepended with higher priority)
workflow |> Workflow.react_until_satisfied(input,
scheduler_policies: [{:flaky_step, %{max_retries: 5}}]
)Policy matchers support exact name atoms, regex ({:name, ~r/^llm_/}), type matching ({:type, Step}), custom predicates (fn node -> ... end), and :default catch-alls. Backoff strategies include :none, :linear, :exponential, and :jitter.
See Runic.Workflow.SchedulerPolicy for the full policy struct and matcher documentation, and Runic.Workflow.PolicyDriver for execution driver details.
Built-in Runner
Runic.Runner provides batteries-included workflow execution infrastructure: a supervision tree with a DynamicSupervisor for workers, Task.Supervisor for fault-isolated dispatch, Registry for lookup, and pluggable persistence via Runic.Runner.Store.
# Start a runner in your supervision tree
{:ok, _} = Runic.Runner.start_link(name: MyApp.Runner)
# Start and run a workflow
{:ok, _pid} = Runic.Runner.start_workflow(MyApp.Runner, :order_123, workflow)
:ok = Runic.Runner.run(MyApp.Runner, :order_123, input)
# Query results
{:ok, results} = Runic.Runner.get_results(MyApp.Runner, :order_123)
# Resume from persisted state after a crash
{:ok, _pid} = Runic.Runner.resume(MyApp.Runner, :order_123)Workers automatically integrate with scheduler policies, dispatch runnables to supervised tasks with configurable max_concurrency, and support checkpointing strategies (:every_cycle, :on_complete, {:every_n, n}, :manual).
Built-in store adapters: Runic.Runner.Store.ETS (default, in-memory) and Runic.Runner.Store.Mnesia (disk-persistent, distributed). Custom adapters implement the Runic.Runner.Store behaviour.
When no :store is configured, the Runner starts its built-in ETS store automatically. When :store is configured explicitly, the Runner assumes that store is already supervised elsewhere, including the built-in ETS and Mnesia adapters.
Telemetry events are emitted under [:runic, :runner, ...] for workflow lifecycle, runnable dispatch/completion, and store operations. See Runic.Runner.Telemetry for the full event catalog.
Guides
For quick reference and best practices:
- Cheatsheet - Quick reference for all core APIs
- Usage Rules - Core concepts, do's/don'ts, and patterns
- Protocols - Extending Runic with custom components and execution behavior
- Building a Workflow Scheduler - From simple spawned processes to production GenServer schedulers
- Durable Execution - Persistence, crash recovery, and checkpointing with the Runner
- State-Based Components - FSMs, Aggregates, Sagas, ProcessManagers and when to use each
Summary
Functions
Creates an %Accumulator{}: maintains cumulative state across individual inputs.
Creates an %Aggregate{}: a CQRS/ES aggregate that validates commands against
current state, produces domain events, and folds events into state.
Returns a list of all %Fact{} structs produced by a component across all invocations.
Returns a list of all raw values produced by a component across all invocations.
Creates a %Condition{}: a standalone conditional expression.
References an external runtime value by key inside Runic macros.
References an external runtime value by key with a default fallback.
Returns the number of facts produced by a given component in the workflow.
Creates a %FSM{}: a finite state machine with discrete states and guarded transitions.
Returns the most recent %Fact{} struct produced by a component.
Returns the most recent raw value produced by a component.
Creates a %Map{}: applies a transformation to each element of an enumerable.
Creates a %ProcessManager{}: a CQRS-oriented process manager that reacts to
domain events, maintains coordination state, and emits commands.
Creates a %Reduce{}: aggregates multiple facts into a single accumulated result.
Creates a %Rule{}: a conditional reaction for pattern-matched execution.
Creates a %Saga{}: a sequence of transaction steps with compensating actions.
Creates a %StateMachine{}: stateful workflows with reducers and conditional reactors.
Used inside Runic macros such as rules to reference the state of another component such as an accumulator or reduce.
Creates a %Step{}: a basic lambda expression that can be added to a workflow.
Evaluates to true in a condition if the specified step has ever been ran.
Evaluates to true if the specified step has been executed for the given input fact.
Converts a Runic component into a %Workflow{} via the Transmutable protocol.
Creates a %Workflow{} from component options.
Functions
Creates an %Accumulator{}: maintains cumulative state across individual inputs.
Unlike reduce/3 which aggregates over collections, accumulators process
single values and maintain running state across multiple workflow invocations.
This makes them ideal for running totals, counters, and stateful computations.
Basic Usage
iex> require Runic
iex> alias Runic.Workflow
iex> acc = Runic.accumulator(0, fn x, state -> state + x end, name: :running_sum)
iex> workflow = Workflow.new() |> Workflow.add(acc)
iex> results = workflow |> Workflow.plan_eagerly(5) |> Workflow.react_until_satisfied() |> Workflow.raw_productions()
iex> 5 in results
trueOptions
:name- Identifier for the accumulator (useful for referencing in rules):inputs/:outputs- Reserved for future schema-based type compatibility
Difference from Reduce
| Accumulator | Reduce |
|---|---|
| Single value per invocation | Aggregates over enumerables |
| State persists across invocations | One-shot aggregation |
| For running totals/counters | For map-reduce patterns |
Building State Machines
Connect rules to accumulators to create state-machine-like behavior:
require Runic
alias Runic.Workflow
counter = Runic.accumulator(0, fn x, acc -> acc + x end, name: :counter)
threshold_rule = Runic.rule(
condition: fn {state, _input} -> state > 100 end,
reaction: fn _ -> :threshold_exceeded end
)Captured Variables
iex> require Runic
iex> alias Runic.Workflow
iex> multiplier = 2
iex> acc = Runic.accumulator(0, fn x, state -> state + x * ^multiplier end, name: :scaled_sum)
iex> acc.closure.bindings[:multiplier]
2
Creates an %Aggregate{}: a CQRS/ES aggregate that validates commands against
current state, produces domain events, and folds events into state.
Compiles to an Accumulator (event fold) plus Rules (command handlers).
Usage
require Runic
agg = Runic.aggregate name: :counter do
state 0
command :increment do
emit fn _state -> {:incremented, 1} end
end
command :decrement do
where fn state -> state > 0 end
emit fn _state -> {:decremented, 1} end
end
event {:incremented, n}, state do
state + n
end
event {:decremented, n}, state do
state - n
end
endOptions
:name- Identifier for the aggregate (required)
DSL
state initial_value- Sets the initial aggregate statecommand :name do ... end- Defines a command handlerwhere fn state -> bool end- Optional guard on current stateemit fn state -> event end- Produces domain events
event pattern, state_var do body end- Defines an event handler (reducer clause)
Returns a list of all %Fact{} structs produced by a component across all invocations.
Unlike all_values_of/1, this returns full %Fact{} structs with metadata.
Examples
require Runic
Runic.rule name: :multi_fact_check do
given(x: x)
where(length(all_facts_of(:events)) > 0)
then(fn %{x: x} -> {:has_events, x} end)
end
Returns a list of all raw values produced by a component across all invocations.
Useful for aggregation in where or then clauses, e.g. summing all scores.
Examples
In a where clause:
require Runic
Runic.rule name: :sum_check do
given(x: x)
where(Enum.sum(all_values_of(:scores)) > 100)
then(fn %{x: x} -> {:high_score, x} end)
endIn a then clause:
require Runic
Runic.rule name: :sum_all_scores do
given(x: _x)
then(fn _bindings -> Enum.sum(all_values_of(:scores)) end)
end
Creates a %Condition{}: a standalone conditional expression.
Conditions represent the left-hand side (predicate) of a rule. They can be reused across multiple rules when the same condition is expensive or shared.
Basic Usage
iex> require Runic
iex> cond = Runic.condition(fn x -> x > 10 end)
iex> cond.work.(15)
true
iex> cond.work.(5)
falseWith Module Function Capture
iex> require Runic
iex> cond = Runic.condition({Kernel, :is_integer, 1})
iex> cond.work.(42)
trueWith Name Option
iex> require Runic
iex> cond = Runic.condition(fn x -> x > 10 end, name: :big_number)
iex> cond.name
:big_numberCaptured Variables with ^
Use the pin operator ^ to capture outer scope variables:
iex> require Runic
iex> threshold = 10
iex> cond = Runic.condition(fn x -> x > ^threshold end)
iex> cond.closure.bindings[:threshold]
10
iex> cond.work.(15)
trueUse Cases
- Expensive checks: When a condition involves costly operations (e.g., API calls, database queries), define it once and reference it in multiple rules
- Stateful conditions: Use
state_of/1to create conditions that depend on accumulator or state machine state - Reusability: Share predicates across rules for consistency
Note: Conditions should be pure and deterministic - they should not execute side effects.
References an external runtime value by key inside Runic macros.
Used inside step, condition, rule, accumulator, map, and reduce
macros to declare a dependency on a value provided via Workflow.put_run_context/2
or the :run_context option on react_until_satisfied/3.
Values are scoped by component name and resolved during the prepare phase.
The _global key in run_context is merged into every component's context.
Resolves to nil when the key is not present in run_context.
Use context/2 to provide a default instead.
Examples
require Runic
alias Runic.Workflow
step = Runic.step(fn _x -> context(:api_key) end, name: :call_llm)
rule =
Runic.rule name: :gated do
given(val: v)
where(v > context(:threshold))
then(fn %{val: v} -> {:ok, v} end)
end
acc = Runic.accumulator(0, fn x, s -> s + x * context(:factor) end, name: :scaled)
workflow =
Workflow.new()
|> Workflow.add(step)
|> Workflow.put_run_context(%{call_llm: %{api_key: "sk-..."}})Dot access is supported for map-valued context keys:
Runic.step(fn x -> x + context(:config).pool_size end, name: :pooled)
References an external runtime value by key with a default fallback.
Behaves like context/1 but uses the provided default when the key is not
present in run_context. The default can be a literal value or a zero-arity
function that is called lazily when needed.
Keys with defaults are not reported as missing by Workflow.validate_run_context/2
and appear as {:optional, default} in Workflow.required_context_keys/1.
Defaults are embedded in the compiled closure and participate in content hashing — two components with different defaults produce different hashes.
Examples
require Runic
# Literal default
step = Runic.step(fn _x -> context(:model, default: "gpt-4") end, name: :call_llm)
# Function default — called lazily when key is missing
step = Runic.step(
fn _x -> context(:api_key, default: fn -> System.get_env("API_KEY") end) end,
name: :call_llm
)
# In rule where clauses
rule =
Runic.rule name: :default_rule do
given(val: v)
where(v > context(:threshold, default: 100))
then(fn %{val: v} -> {:over, v} end)
end
# In accumulator reducers
acc = Runic.accumulator(0, fn x, s -> s + x * context(:factor, default: 1) end,
name: :scaled
)
Returns the number of facts produced by a given component in the workflow.
Used inside where clauses of rules to gate on how many facts a component has produced.
Examples
require Runic
Runic.rule name: :batch_ready do
given(x: x)
where(fact_count(:items) >= 3)
then(fn %{x: x} -> {:process_batch, x} end)
end
Creates a %FSM{}: a finite state machine with discrete states and guarded transitions.
FSMs compile to an Accumulator (holding the current state atom) plus Rules
(one per transition, using state_of() to gate on current state). Entry actions
are additional rules that fire on state changes.
Example
require Runic
fsm = Runic.fsm name: :traffic_light do
initial_state :red
state :red do
on :timer, to: :green
on :emergency, to: :red
on_entry fn -> {:notify, :traffic_stopped} end
end
state :green do
on :timer, to: :yellow
on :emergency, to: :red
end
state :yellow do
on :timer, to: :red
on :emergency, to: :red
end
endEach transition compiles to a named Rule: :"fsm_name_from_state_on_event".
Returns the most recent %Fact{} struct produced by a component.
Unlike latest_value_of/1, this returns the full %Fact{} struct including
metadata such as hash and ancestry, not just the raw value.
Examples
require Runic
Runic.rule name: :check_latest_fact do
given(x: x)
where(latest_fact_of(:processor) != nil)
then(fn %{x: x} -> {:ok, x} end)
end
Returns the most recent raw value produced by a component.
Useful in where clauses to compare against the latest output, or in then
clauses to incorporate another component's latest result.
Examples
In a where clause:
require Runic
Runic.rule name: :high_temp_alert do
given(x: x)
where(latest_value_of(:sensor) > 100)
then(fn %{x: x} -> {:alert, x} end)
endIn a then clause:
require Runic
Runic.rule name: :echo_latest do
given(x: x)
then(fn %{x: x} -> {:latest, x, latest_value_of(:sensor)} end)
end
Creates a %Map{}: applies a transformation to each element of an enumerable.
Map operations fan-out an enumerable input into individual elements, apply the transformation to each, and can be followed by a reduce to fan-in results.
Basic Usage
iex> require Runic
iex> alias Runic.Workflow
iex> map_op = Runic.map(fn x -> x * 2 end, name: :double)
iex> workflow = Workflow.new() |> Workflow.add(map_op)
iex> results = workflow |> Workflow.plan_eagerly([1, 2, 3]) |> Workflow.react_until_satisfied() |> Workflow.raw_productions()
iex> Enum.sort(results)
[2, 4, 6]Options
:name- Identifier for referencing inreduce/3via:mapoption:inputs/:outputs- Reserved for future schema-based type compatibility
With Pipeline
Map can contain nested pipelines:
require Runic
Runic.map(
{Runic.step(fn x -> x * 2 end, name: :double),
[Runic.step(fn x -> x + 1 end, name: :add_one)]}
)Map-Reduce Pattern
Connect a reduce to collect mapped results. The reduce's :map option links
it to the upstream map:
iex> require Runic
iex> alias Runic.Workflow
iex> map_op = Runic.map(fn x -> x * 2 end, name: :double)
iex> reduce_op = Runic.reduce(0, fn x, acc -> x + acc end, name: :sum, map: :double)
iex> workflow = Workflow.new()
...> |> Workflow.add(map_op)
...> |> Workflow.add(reduce_op, to: :double)
iex> results = workflow
...> |> Workflow.plan_eagerly([1, 2, 3])
...> |> Workflow.react_until_satisfied()
...> |> Workflow.raw_productions(:sum)
iex> 12 in results
trueHow It Works
Internally, map uses a FanOut component that splits the enumerable into
individual facts. Each element is processed independently, enabling
parallel execution with the :async option on react/2.
Creates a %ProcessManager{}: a CQRS-oriented process manager that reacts to
domain events, maintains coordination state, and emits commands.
Unlike Saga, ProcessManagers are event-driven and reactive rather than sequential. They subscribe to event patterns from multiple sources and decide what commands to issue based on accumulated state.
Compiles to an Accumulator (coordination state) plus Rules (event handlers).
Example
require Runic
pm = Runic.process_manager name: :fulfillment do
state %{order_id: nil, paid: false, shipped: false}
on {:order_submitted, order_id} do
update %{order_id: order_id}
emit {:charge_payment, order_id}
end
on {:payment_received, _} do
update %{paid: true}
end
on {:shipment_created, _} do
update %{shipped: true}
end
complete? fn state -> state.shipped end
endDSL
state initial_value- Sets the initial process manager stateon event_pattern do ... end- Defines an event handlerupdate map- Merges updates into the process stateemit value- Produces a command fact as output
complete? fn state -> bool end- Completion check (fires when state satisfies predicate)timeout :name, duration do ... end- Declares a timeout (scheduling is the Runner's responsibility)
Options
:name- Identifier for the process manager (required)
Creates a %Reduce{}: aggregates multiple facts into a single accumulated result.
Reduce operations fan-in results from a map operation or process an enumerable
from a parent step. Unlike accumulator/3 which processes single values
cumulatively, reduce/3 aggregates over collections.
Basic Usage
Like Enum.reduce/3, process an enumerable from a parent step:
require Runic
alias Runic.Workflow
workflow = Runic.workflow(
name: :sum_range,
steps: [
{Runic.step(fn -> [1, 2, 3, 4, 5] end, name: :generate),
[Runic.reduce(0, fn x, acc -> x + acc end, name: :sum)]}
]
)Options
:name- Identifier for the reduce component:map- Name of an upstream map component for fan-in (lazy evaluation):inputs/:outputs- Reserved for future schema-based type compatibility
Map-Reduce Pattern (Lazy Evaluation)
When :map is specified, reduce waits for all mapped elements before aggregating.
This enables lazy/parallel execution of the map phase:
iex> require Runic
iex> alias Runic.Workflow
iex> map_op = Runic.map(fn x -> x * 2 end, name: :double)
iex> reduce_op = Runic.reduce(0, fn x, acc -> x + acc end, name: :sum, map: :double)
iex> workflow = Workflow.new()
...> |> Workflow.add(map_op)
...> |> Workflow.add(reduce_op, to: :double)
iex> results = workflow
...> |> Workflow.plan_eagerly([1, 2, 3])
...> |> Workflow.react_until_satisfied()
...> |> Workflow.raw_productions(:sum)
iex> 12 in results
trueNested Pipeline with Reduce
Reduce can follow a pipeline after the map:
require Runic
Runic.workflow(
steps: [
{Runic.map(fn x -> x * 2 end, name: :double),
[{Runic.step(fn x -> x + 1 end, name: :add_one),
[Runic.reduce(0, fn x, acc -> x + acc end, name: :sum, map: :double)]}]}
]
)Important Notes
- Reduce operations are inherently sequential and cannot be parallelized unless your reducer has CRDT (commutative) properties
- Without
:map, reduce processes the enumerable eagerly in one invocation - With
:map, reduce waits for all fan-out elements before reducing
Creates a %Rule{}: a conditional reaction for pattern-matched execution.
Rules have two phases: a condition (left-hand side) that must match, and a reaction (right-hand side) that executes when matched. This separation enables efficient evaluation of many rules together.
Basic Usage
iex> require Runic
iex> alias Runic.Workflow.Rule
iex> rule = Runic.rule(fn x when is_integer(x) and x > 0 -> :positive end)
iex> Rule.check(rule, 5)
true
iex> Rule.check(rule, -1)
false
iex> Rule.run(rule, 5)
:positiveGuard Clauses
Rules support full Elixir guard expressions:
iex> require Runic
iex> alias Runic.Workflow.Rule
iex> rule = Runic.rule(fn x when is_binary(x) and byte_size(x) > 5 -> :long_string end)
iex> Rule.check(rule, "hello!")
true
iex> Rule.check(rule, "hi")
falsePattern Matching
iex> require Runic
iex> alias Runic.Workflow.Rule
iex> rule = Runic.rule(fn %{status: :pending} -> :process end)
iex> Rule.check(rule, %{status: :pending, id: 1})
true
iex> Rule.check(rule, %{status: :done})
falseSeparated Condition and Reaction
For expensive conditions shared by multiple rules, or clearer organization:
iex> require Runic
iex> alias Runic.Workflow.Rule
iex> rule = Runic.rule(
...> name: :expensive_check,
...> condition: fn x -> rem(x, 2) == 0 end,
...> reaction: fn x -> x * 2 end
...> )
iex> Rule.check(rule, 4)
true
iex> Rule.run(rule, 4)
8Also supports :if / :do aliases:
Runic.rule(
name: :my_rule,
if: fn x -> x > 10 end,
do: fn x -> :large end
)Multi-Arity Rules
Rules can require multiple inputs (provided as a list):
iex> require Runic
iex> alias Runic.Workflow.Rule
iex> rule = Runic.rule(fn a, b when is_integer(a) and is_integer(b) -> a + b end)
iex> Rule.check(rule, [3, 4])
true
iex> Rule.run(rule, [3, 4])
7In Workflows
Rules are evaluated in the planning/match phase before execution:
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(
...> name: :classifier,
...> rules: [
...> Runic.rule(fn x when x > 100 -> :xlarge end, name: :xlarge),
...> Runic.rule(fn x when x > 10 and x <= 100 -> :large end, name: :large),
...> Runic.rule(fn x when x > 0 and x <= 10 -> :small end, name: :small)
...> ]
...> )
iex> workflow |> Workflow.plan_eagerly(50) |> Workflow.react_until_satisfied() |> Workflow.raw_productions()
[:large]Given/Where/Then DSL
For complex rules with pattern destructuring, use the explicit DSL with three clauses:
given— a pattern-matching clause that destructures the input and binds variables. The binding name becomes the key in the bindings map passed tothen.where— a boolean expression evaluated at runtime. Unlikewhenguards,wheresupports any Elixir expression including function calls likeString.starts_with?/2,Enum.member?/2, etc.then— a function that receives the bindings map and produces a result. The bindings fromgivenare available as keys.
Note: Use where instead of when because when is a reserved Elixir keyword.
Do/End Block Form
require Runic
Runic.rule do
given(order: %{status: status, total: total})
where(status == :pending and total > 100)
then(fn %{order: order, total: total} -> {:apply_discount, order, total * 0.9} end)
endNamed Do/End Block Form
Pass options before the do block to name the rule:
require Runic
Runic.rule name: :premium_discount do
given(order: %{customer: %{tier: tier}, total: total})
where(tier == :premium and total > 50)
then(fn %{order: order} -> {:apply_discount, order} end)
endKeyword Form
The same rule can be written as a keyword list:
require Runic
Runic.rule(
name: :threshold_check,
given: [value: v],
where: v > 100,
then: fn %{value: v} -> {:over_threshold, v} end
)Direct map patterns are also supported in the keyword form:
Runic.rule(
name: :map_pattern,
given: %{x: x, y: y},
where: x + y > 10,
then: fn %{x: x, y: y} -> {:sum, x + y} end
)Non-Guard Expressions in where
Unlike when guards, where supports any boolean expression:
require Runic
Runic.rule do
given(name: name)
where(String.starts_with?(name, "prefix_"))
then(fn %{name: n} -> {:matched, n} end)
endCapturing External Variables with ^
Use the pin operator ^ to capture variables from the surrounding scope.
This is essential when dynamically constructing rules in loops or functions:
require Runic
alias Runic.Workflow
threshold = 100
rule =
Runic.rule do
given(value: v)
where(v > ^threshold)
then(fn %{value: v} -> {:over_threshold, v} end)
end
Workflow.new()
|> Workflow.add(rule)
|> Workflow.react_until_satisfied(150)
|> Workflow.raw_productions()
# => [{:over_threshold, 150}]The ^ pin also works in the keyword form and in condition/reaction style:
some_values = [:potato, :ham, :tomato]
Runic.rule(
name: "escaped rule",
condition: fn val when is_atom(val) -> true end,
reaction: fn val ->
Enum.map(^some_values, fn x -> {val, x} end)
end
)
Creates a %Saga{}: a sequence of transaction steps with compensating actions.
Sagas are explicit forward-then-compensate pipelines. Each transaction step must have a corresponding compensate block. On failure, completed steps are compensated in reverse order.
Compiles to an Accumulator (tracking saga state), forward Rules (one per transaction step), and compensation Rules (one per compensate block).
Usage
require Runic
saga = Runic.saga name: :fulfillment do
transaction :reserve_inventory do
fn _input -> {:ok, :reserved} end
end
compensate :reserve_inventory do
fn %{reserve_inventory: _} -> :released end
end
transaction :charge_payment do
fn %{reserve_inventory: _} -> {:ok, :charged} end
end
compensate :charge_payment do
fn %{charge_payment: _} -> :refunded end
end
on_complete fn results -> {:saga_completed, results} end
on_abort fn reason, compensated -> {:saga_aborted, reason, compensated} end
end
Creates a %StateMachine{}: stateful workflows with reducers and conditional reactors.
State machines combine an accumulator with rules that react to state changes. The reducer processes inputs in context of accumulated state, and reactors conditionally execute based on the new state.
Basic Usage
iex> require Runic
iex> alias Runic.Workflow
iex> counter = Runic.state_machine(
...> name: :counter,
...> init: 0,
...> reducer: fn x, acc -> acc + x end
...> )
iex> workflow = Workflow.new() |> Workflow.add(counter)
iex> results = workflow |> Workflow.plan_eagerly(5) |> Workflow.react_until_satisfied() |> Workflow.raw_productions()
iex> Enum.sort(results)
[0, 5]Options
:name- Identifier for the state machine (required for referencing):init- Initial state (literal value, function, or{M, F, A}tuple):reducer- Function(input, state) -> new_statefor state transitions:reactors- List of rules that react to state changes:inputs/:outputs- Reserved for future schema-based type compatibility
With Reactors
Reactors are rules that fire when the accumulated state matches their conditions:
require Runic
alias Runic.Workflow
threshold_sm = Runic.state_machine(
name: :threshold_monitor,
init: 0,
reducer: fn x, acc -> acc + x end,
reactors: [
fn state when state > 100 -> :threshold_exceeded end,
fn state when state > 50 -> :warning end
]
)Lock/Unlock Example
require Runic
lock = Runic.state_machine(
name: :lock,
init: %{code: "secret", state: :locked},
reducer: fn
:lock, state ->
%{state | state: :locked}
{:unlock, code}, %{code: code, state: :locked} = state ->
%{state | state: :unlocked}
_, state ->
state
end,
reactors: [
fn %{state: :unlocked} -> :access_granted end,
fn %{state: :locked} -> :access_denied end
]
)Block DSL with handle/react (Form 2)
For state machines with complex state and event-driven transitions, the
block DSL provides a more expressive form. Each handle clause bundles
an event match, input pattern, state binding, and state transformation
into a named, addressable sub-component. react clauses observe state
without modifying it.
Runic.state_machine name: :cart, init: %{items: [], total: 0} do
handle :add_item, %{item: item}, state do
%{state | items: [item | state.items], total: state.total + item.price}
end
handle :checkout, _, state when state.items != [] do
%{state | status: :checked_out}
end
react :high_value do
fn %{total: t} when t > 1000 -> {:vip_alert, t} end
end
endhandle clause semantics
handle event_pattern, input_match, state_var [when state_guard] do
body # must return next state
endevent_pattern— atom or pattern matched against the incoming fact's event type discriminator.input_match— pattern match on the event payload / fact value.state_var— binds the current state viastate_of(:sm_name)meta_ref.when state_guard— optional guard on current state.body— returns the next state value, fed to the accumulator.
Each handle compiles to a named Rule:
:"<sm_name>_<event_pattern>" (e.g., :cart_add_item).
react clause semantics
react name do
fn state_pattern -> output end
end- Name is explicitly required (the atom after
react). - Compiles to a Rule with a
state_of()condition and a step that produces an output fact. - Does not modify state — observation only.
Both forms produce identical %StateMachine{} structs. The handle
block is sugar for splitting a multi-clause reducer into individually
named rules.
Captured Variables
Use ^ for runtime values in reducers and reactors:
multiplier = 2
Runic.state_machine(
name: :scaled_sum,
init: 0,
reducer: fn x, acc -> acc + x * ^multiplier end
)
Used inside Runic macros such as rules to reference the state of another component such as an accumulator or reduce.
Expands in conjunction with the rest of the expression of the rule's expression to evaluate against the last known state of the component.
Examples
require Runic
alias Runic.Workflow
counter = Runic.accumulator(0, fn x, acc -> acc + x end, name: :counter)
threshold_rule =
Runic.rule name: :threshold_check do
given(x: x)
where(state_of(:counter) > 5)
then(fn %{x: x} -> {:above_threshold, x} end)
end
workflow =
Workflow.new()
|> Workflow.add(counter)
|> Workflow.add(threshold_rule, to: :counter)
Creates a %Step{}: a basic lambda expression that can be added to a workflow.
Steps are the fundamental building blocks of Runic workflows, representing input → output transformations. Each step wraps a function and can be composed with other steps to form data processing pipelines.
Basic Usage
iex> require Runic
iex> step = Runic.step(fn x -> x * 2 end)
iex> step.work.(5)
10Arities
Steps support 0, 1, or 2-arity functions:
iex> require Runic
iex> zero_arity = Runic.step(fn -> 42 end)
iex> zero_arity.work.()
42
iex> require Runic
iex> one_arity = Runic.step(fn x -> x + 1 end)
iex> one_arity.work.(10)
11
iex> require Runic
iex> two_arity = Runic.step(fn a, b -> a + b end)
iex> two_arity.work.(3, 4)
7Note: 2-arity steps only execute when the workflow receives a 2-element list as input.
Captured Variables with ^
Use the pin operator ^ to capture outer scope variables. This is essential for:
Content-addressable hashing (each bound value produces unique hashes)
Serialization with
build_log/1and recovery withfrom_log/1Dynamic workflow construction at runtime
iex> require Runic iex> multiplier = 3 iex> step = Runic.step(fn x -> x * ^multiplier end) iex> step.closure.bindings[:multiplier] 3 iex> step.work.(10) 30
Without ^, Elixir's normal closure mechanism captures the variable, but Runic
cannot track, hash, or serialize it - the workflow will fail after persistence.
Captured Functions
Steps can wrap module functions using capture syntax:
iex> require Runic
iex> step = Runic.step(&String.upcase/1)
iex> step.work.("hello")
"HELLO"Options
:name- An atom or string identifier for referencing this step in workflows:work- The function to execute (alternative to passing as first argument):inputs- Reserved for future schema-based type compatibility:outputs- Reserved for future schema-based type compatibilityiex> require Runic iex> step = Runic.step(fn x -> x * 2 end, name: :doubler) iex> step.name :doubler
iex> require Runic iex> step = Runic.step(name: :tripler, work: fn x -> x * 3 end) iex> step.name :tripler
In Workflows
Steps can be connected in pipelines using the workflow DSL:
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(
...> name: "pipeline",
...> steps: [
...> {Runic.step(fn x -> x + 1 end, name: :add_one),
...> [Runic.step(fn x -> x * 2 end, name: :double)]}
...> ]
...> )
iex> results = workflow |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions()
iex> Enum.sort(results)
[6, 12]
Evaluates to true in a condition if the specified step has ever been ran.
Note that this evaluates to true globally for any prior execution of the workflow, not just within the current invocation.
Examples
require Runic
alias Runic.Workflow
rule =
Runic.rule name: :after_validation do
given(x: x)
where(step_ran?(:validator))
then(fn %{x: x} -> {:validated, x} end)
end
Evaluates to true if the specified step has been executed for the given input fact.
Considers only input facts for a generation of invokations fed into the root of the workflow.
Examples
require Runic
alias Runic.Workflow
rule =
Runic.rule name: :scoped_check do
given(x: x)
where(step_ran?(:validator, x))
then(fn %{x: x} -> {:validated, x} end)
end
Converts a Runic component into a %Workflow{} via the Transmutable protocol.
Components like steps, rules, state machines, etc. can be transmuted into standalone workflows for evaluation or composition.
Examples
iex> require Runic
iex> step = Runic.step(fn x -> x * 2 end)
iex> workflow = Runic.transmute(step)
iex> workflow.__struct__
Runic.Workflow
iex> require Runic
iex> rule = Runic.rule(fn x when x > 0 -> :positive end)
iex> workflow = Runic.transmute(rule)
iex> workflow.__struct__
Runic.WorkflowUse Cases
- Preparing components for standalone evaluation
- Converting natural representations (e.g., a single rule) into evaluable workflows
- Composing heterogeneous components by first converting to workflows, then merging
Creates a %Workflow{} from component options.
Workflows are directed acyclic graphs (DAGs) of steps, rules, and other components connected through dataflow semantics. They enable lazy or eager evaluation and can be composed, persisted, and distributed.
Basic Usage
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(
...> name: :simple,
...> steps: [Runic.step(fn x -> x * 2 end)]
...> )
iex> workflow |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions()
[10]Options
:name- Identifier for the workflow (atom or string):steps- List of steps, with optional pipeline syntax for parent-child relationships:rules- List of conditional rules to add:before_hooks- Debug hooks called before step execution:after_hooks- Debug hooks called after step execution:input_ports- Port contract for workflow boundary inputs (keyword list of port schemas):output_ports- Port contract for workflow boundary outputs (keyword list of port schemas)
Pipeline Syntax
Use tuples {parent, [children]} to define step dependencies:
iex> require Runic
iex> alias Runic.Workflow
iex> pipeline = Runic.workflow(
...> name: :pipeline,
...> steps: [
...> {Runic.step(fn x -> x + 1 end, name: :add_one),
...> [Runic.step(fn x -> x * 2 end, name: :double),
...> Runic.step(fn x -> x * 3 end, name: :triple)]}
...> ]
...> )
iex> results = pipeline |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions()
iex> Enum.sort(results)
[6, 12, 18]With Rules
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(
...> name: :rule_example,
...> rules: [
...> Runic.rule(fn x when is_integer(x) and x > 10 -> :large end),
...> Runic.rule(fn x when is_integer(x) and x <= 10 -> :small end)
...> ]
...> )
iex> workflow |> Workflow.plan_eagerly(15) |> Workflow.react_until_satisfied() |> Workflow.raw_productions()
[:large]Hooks
Hooks receive (step, workflow, fact) and must return the workflow.
Use them for debugging, logging, or dynamic workflow modification:
require Runic
alias Runic.Workflow
Runic.workflow(
name: :with_hooks,
steps: [Runic.step(fn x -> x * 2 end, name: :double)],
after_hooks: [
double: [
fn _step, workflow, fact ->
IO.puts("Produced: #{inspect(fact.value)}")
workflow
end
]
]
)Boundary Ports
Declare input and output ports to make a workflow composable as a typed component. Workflows without ports return empty contracts and are connectable to anything.
require Runic
alias Runic.Workflow
workflow = Runic.workflow(
name: :price_calculator,
steps: [
{Runic.step(fn order -> order.items end, name: :parse_order),
[Runic.step(fn items -> Enum.sum(Enum.map(items, & &1.price)) end, name: :calculate_total)]}
],
input_ports: [
order: [type: :map, doc: "Order to price", to: :parse_order]
],
output_ports: [
total: [type: :float, doc: "Calculated total", from: :calculate_total]
]
)
# Ports are surfaced via the Component protocol
Runic.Component.inputs(workflow)
# => [order: [type: :map, doc: "Order to price", to: :parse_order]]Port options: :type, :doc, :cardinality, :required, :to (input binding), :from (output binding).
The :to and :from options reference internal component names and are validated at build time.