Runic (Runic v0.1.0-alpha.7)

Copy Markdown View Source

Runic

Runic is a tool for modeling programs as data driven workflows that can be composed together at runtime.

Runic components connect together in a Runic.Workflow supporting lazily evaluated concurrent execution.

Runic Workflows are modeled as a decorated dataflow graph (a DAG - "directed acyclic graph") compiled from components such as steps, rules, pipelines, and state machines and more allowing coordinated interaction of disparate parts.

Installation

If available in Hex, the package can be installed by adding runic to your list of dependencies in mix.exs:

def deps do
  [
    {:runic, "~> 0.1.0-alpha"}
  ]
end

Documentation can be generated with ExDoc and published on HexDocs. Once published, the docs can be found at https://hexdocs.pm/runic.

Concepts

Data flow dependencies between Lambda expressions, common in ETL pipelines, can be built with %Step{} components.

A Lambda Steps is a simple input -> output function.

require Runic

step = Runic.step(fn x -> x + 1 end)

Steps are composable in a workflow:

workflow = Runic.workflow(
  name: "example pipeline workflow",
  steps: [
    Runic.step(fn x -> x + 1 end), #A
    Runic.step(fn x -> x * 2 end), #B
    Runic.step(fn x -> x - 1 end) #C
  ]
)

This produces a workflow graph where R is the entrypoint or "root" of the tree:

graph TD;
    R-->A;
    R-->B;
    R-->C;

In Runic, inputs flow through a workflow as a %Fact{}. During workflow evaluation various steps are traversed to and invoked producing more Facts.

alias Runic.Workflow

workflow
|> Workflow.react_until_satisfied(2)
|> Workflow.raw_productions()

[3, 4, 1]

A core benefit Runic workflows are modeling pipelines that aren't just linear. For example:

defmodule TextProcessing do
  def tokenize(text) do
    text
    |> String.downcase()
    |> String.split(~R/[^[:alnum:]\-]/u, trim: true)
  end

  def count_words(list_of_words) do
    list_of_words
    |> Enum.reduce(Map.new(), fn word, map ->
      Map.update(map, word, 1, &(&1 + 1))
    end)
  end

  def count_uniques(word_count) do
    Enum.count(word_count)
  end

  def first_word(list_of_words) do
    List.first(list_of_words)
  end

  def last_word(list_of_words) do
    List.last(list_of_words)
  end
end

Notice we have 3 functions that expect a list_of_words. In Elixir if we wanted to evaluate each output we can pipe them together the pipeline |> operator...

import TextProcessing

word_count = 
  "anybody want a peanut?"
  |> tokenize()
  |> count_words()

first_word = 
  "anybody want a peanut?"
  |> tokenize()
  |> first_word()

last_word = 
  "anybody want a peanut?"
  |> tokenize()
  |> last_word()

However we're now evaluating linearly: using the common tokenize/1 function 3 times for the same input text.

This could be problematic if tokenize/1 is expensive - we'd prefer to run tokenize/1 just once and then fed into the rest of our pipeline.

With Runic we can compose all of these steps into one workflow and evaluate them together.

text_processing_workflow = 
  Runic.workflow(
    name: "basic text processing example",
    steps: [
      {Runic.step(&tokenize/1),
        [
          {Runic.step(&count_words/1),
          [
            Runic.step(&count_uniques/1)
          ]},
          Runic.step(&first_word/1),
          Runic.step(&last_word/1)
        ]}
    ]
  )

Our text processing workflow graph now looks something like this:

graph TD;
    R-->tokenize;
    tokenize-->first_word;
    tokenize-->last_word;
    tokenize-->count_words;
    count_words-->count_uniques;

Now Runic can traverse over the graph of dataflow connections only evaluating tokenize/1 once for all three dependent steps.

alias Runic.Workflow

text_processing_workflow 
|> Workflow.react_until_satisfied("anybody want a peanut?") 
|> Workflow.raw_productions()

[
  ["anybody", "want", "a", "peanut"], 
  "anybody", 
  "peanut", 
  4,
  %{"a" => 1, "anybody" => 1, "peanut" => 1, "want" => 1}
]

Beyond steps, Runic has support for Rules, Joins, State Machines, FSMs, Aggregates, Sagas, and ProcessManagers for more complex control flow and stateful evaluation.

The Runic.Workflow.Invokable protocol is what allows for extension of Runic's runtime supporting nodes with different execution properties and evaluation.

The Runic.Component protocol supports extension of modeling new components that can be added and connected with other components in Runic workflows.

Runtime Workflow Composition

Workflows can be composed dynamically at runtime:

require Runic
alias Runic.Workflow

# Using Workflow.add/3 for dynamic composition
workflow = Runic.workflow()
  |> Workflow.add(Runic.step(fn x -> x + 1 end, name: :add))
  |> Workflow.add(Runic.step(fn x -> x * 2 end, name: :double), to: :add)

# Merge two workflows together
workflow1 = Runic.workflow(steps: [Runic.step(fn x -> x + 1 end)])
workflow2 = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
combined = Workflow.merge(workflow1, workflow2)

# Join multiple parent nodes
workflow = workflow
  |> Workflow.add(Runic.step(fn a, b -> a + b end, name: :join), to: [:branch_a, :branch_b])

See Runic.Workflow module documentation for adding components to workflows and running them.

Three-Phase Execution Model

Runic's Invokable protocol enforces a three-phase execution model designed for parallel execution and external scheduling:

  1. Prepare - Extract minimal context from the workflow into %Runnable{} structs
  2. Execute - Run runnables containing work functions and their inputs in isolation (can be parallelized)
  3. Apply - Reduce results back into the workflow so next steps can be determined

Parallel Execution

For workflows where nodes can execute concurrently:

alias Runic.Workflow

# Execute runnables in parallel with configurable concurrency
workflow
|> Workflow.react_until_satisfied(input, async: true, max_concurrency: 8) # Task.async_stream options
|> Workflow.raw_productions()

External Scheduler Integration

For custom schedulers, worker pools, or distributed execution:

defmodule MyApp.WorkflowScheduler do
  use GenServer
  alias Runic.Workflow
  alias Runic.Workflow.Invokable

# Phase 1: Prepare runnables for dispatch
  def handle_cast({:run, input}, %{workflow: workflow} = state) do
    workflow = 
      workflow
      |> Workflow.plan_eagerly(input)
      |> dispatch_tasks()

    {:noreply, %{state | workflow: workflow}}
  end

  # Phase 2: Execute (dispatch to async worker pool, queue, external service, etc.)
  defp dispatch_tasks(workflow) do
    {workflow, runnables} = Workflow.prepare_for_dispatch(workflow)

    Enum.map(runnables, fn runnable -> 
      Task.async(fn ->
        # consider logging, error handling, retries here
        Invokable.execute(runnable.node, runnable)
      end)
    end)

    workflow
  end

  # Phase 3: Apply results back to workflow by handling async task callbacks with excecuted runnable
  def handle_info({ref, executed_runnable}, %{workflow: workflow} = state) do
    new_workflow =
      Enum.reduce(executed, workflow, fn {:ok, runnable}, wrk ->
        Workflow.apply_runnable(wrk, runnable)
      end)

    workflow = 
      if Workflow.is_runnable?(new_workflow) do
        dispatch_tasks(workflow)
      end

    {:noreply, %{state | workflow: workflow}}
  end
end

Key APIs for external scheduling:

  • Workflow.prepare_for_dispatch/1 - Returns {workflow, [%Runnable{}]} for dispatch
  • Workflow.apply_runnable/2 - Applies a completed runnable back to the workflow
  • Invokable.execute/2 - Executes a runnable in isolation (no workflow access)

In summary, the Runic module provides high level functions and macros for building Runic Components such as Steps, Rules, Workflows, and Accumulators.

The Runic.Workflow module is for connecting components together and running them with inputs.

Runic was designed to be used with custom process topologies and libraries such as GenStage, Broadway, and Flow without coupling you to one runtime model or a limited set of adapters.

Runic has first class support for dynamic runtime composition of workflows.

Runic is useful in problems where a developer cannot know upfront the logic or data flow in compiled code such as expert systems, user DSLs like Excel spreadsheets, low/no-code tools, or dynamic data pipelines.

If the runtime modification of a workflow or complex parallel dataflow evaluation isn't something your use case requires you might not need Runic.

Runic Workflows are essentially a dataflow based virtual machine running within Elixir and will not be faster than compiled Elixir code. If you know the flow of the program upfront during development you might not need Runic.

Runtime Context

Components can declare dependencies on external runtime values using context/1:

# Steps can reference external values
step = Runic.step(fn _x -> context(:api_key) end, name: :call_llm)

# Rules can use context in conditions
rule = Runic.rule name: :gated do
  given(val: v)
  where(v > context(:threshold))
  then(fn %{val: v} -> {:ok, v} end)
end

# Accumulators, map, and reduce also support context/1
acc = Runic.accumulator(0, fn x, s -> s + x * context(:factor) end, name: :scaled)
map = Runic.map(fn x -> x * context(:multiplier) end, name: :mult_map)

# Provide defaults for optional context keys
step = Runic.step(fn _x -> context(:model, default: "gpt-4") end, name: :call_llm)

# Provide values at runtime
workflow
|> Workflow.put_run_context(%{
  call_llm: %{api_key: "sk-..."},
  _global: %{workspace_id: "ws1"}
})
|> Workflow.react_until_satisfied(input)

Context values are scoped by component name, not part of the workflow hash, and not serialized — making them safe for secrets and connection handles. Keys with defaults are satisfied without explicit run_context entries. See Workflow.required_context_keys/1 and Workflow.validate_run_context/2 for introspection.

Scheduler Policies

Runic workflows support declarative per-node scheduling policies for retries, timeouts, backoff, fallbacks, and failure handling — without modifying the Invokable protocol or existing component structs.

Policies are stored on the workflow as a list of {matcher, policy_map} rules resolved at execution time. The first matching rule wins:

alias Runic.Workflow
alias Runic.Workflow.SchedulerPolicy

workflow =
  workflow
  |> Workflow.add_scheduler_policy(:call_llm, %{
    max_retries: 3,
    backoff: :exponential,
    timeout_ms: 30_000
  })
  |> Workflow.append_scheduler_policy(:default, %{timeout_ms: 10_000})

# Policies are applied automatically during react/react_until_satisfied
workflow |> Workflow.react_until_satisfied(input)

# Runtime overrides can be passed as options (prepended with higher priority)
workflow |> Workflow.react_until_satisfied(input,
  scheduler_policies: [{:flaky_step, %{max_retries: 5}}]
)

Policy matchers support exact name atoms, regex ({:name, ~r/^llm_/}), type matching ({:type, Step}), custom predicates (fn node -> ... end), and :default catch-alls. Backoff strategies include :none, :linear, :exponential, and :jitter.

See Runic.Workflow.SchedulerPolicy for the full policy struct and matcher documentation, and Runic.Workflow.PolicyDriver for execution driver details.

Built-in Runner

Runic.Runner provides batteries-included workflow execution infrastructure: a supervision tree with a DynamicSupervisor for workers, Task.Supervisor for fault-isolated dispatch, Registry for lookup, and pluggable persistence via Runic.Runner.Store.

# Start a runner in your supervision tree
{:ok, _} = Runic.Runner.start_link(name: MyApp.Runner)

# Start and run a workflow
{:ok, _pid} = Runic.Runner.start_workflow(MyApp.Runner, :order_123, workflow)
:ok = Runic.Runner.run(MyApp.Runner, :order_123, input)

# Query results
{:ok, results} = Runic.Runner.get_results(MyApp.Runner, :order_123)

# Resume from persisted state after a crash
{:ok, _pid} = Runic.Runner.resume(MyApp.Runner, :order_123)

Workers automatically integrate with scheduler policies, dispatch runnables to supervised tasks with configurable max_concurrency, and support checkpointing strategies (:every_cycle, :on_complete, {:every_n, n}, :manual).

Built-in store adapters: Runic.Runner.Store.ETS (default, in-memory) and Runic.Runner.Store.Mnesia (disk-persistent, distributed). Custom adapters implement the Runic.Runner.Store behaviour.

When no :store is configured, the Runner starts its built-in ETS store automatically. When :store is configured explicitly, the Runner assumes that store is already supervised elsewhere, including the built-in ETS and Mnesia adapters.

Telemetry events are emitted under [:runic, :runner, ...] for workflow lifecycle, runnable dispatch/completion, and store operations. See Runic.Runner.Telemetry for the full event catalog.

Guides

For quick reference and best practices:

Summary

Functions

Creates an %Accumulator{}: maintains cumulative state across individual inputs.

Creates an %Aggregate{}: a CQRS/ES aggregate that validates commands against current state, produces domain events, and folds events into state.

Returns a list of all %Fact{} structs produced by a component across all invocations.

Returns a list of all raw values produced by a component across all invocations.

Creates a %Condition{}: a standalone conditional expression.

References an external runtime value by key inside Runic macros.

References an external runtime value by key with a default fallback.

Returns the number of facts produced by a given component in the workflow.

Creates a %FSM{}: a finite state machine with discrete states and guarded transitions.

Returns the most recent %Fact{} struct produced by a component.

Returns the most recent raw value produced by a component.

Creates a %Map{}: applies a transformation to each element of an enumerable.

Creates a %ProcessManager{}: a CQRS-oriented process manager that reacts to domain events, maintains coordination state, and emits commands.

Creates a %Reduce{}: aggregates multiple facts into a single accumulated result.

Creates a %Rule{}: a conditional reaction for pattern-matched execution.

Creates a %Saga{}: a sequence of transaction steps with compensating actions.

Creates a %StateMachine{}: stateful workflows with reducers and conditional reactors.

Used inside Runic macros such as rules to reference the state of another component such as an accumulator or reduce.

Creates a %Step{}: a basic lambda expression that can be added to a workflow.

Evaluates to true in a condition if the specified step has ever been ran.

Evaluates to true if the specified step has been executed for the given input fact.

Converts a Runic component into a %Workflow{} via the Transmutable protocol.

Creates a %Workflow{} from component options.

Functions

accumulator(init, reducer_fun, opts \\ [])

(macro)

Creates an %Accumulator{}: maintains cumulative state across individual inputs.

Unlike reduce/3 which aggregates over collections, accumulators process single values and maintain running state across multiple workflow invocations. This makes them ideal for running totals, counters, and stateful computations.

Basic Usage

iex> require Runic
iex> alias Runic.Workflow
iex> acc = Runic.accumulator(0, fn x, state -> state + x end, name: :running_sum)
iex> workflow = Workflow.new() |> Workflow.add(acc)
iex> results = workflow |> Workflow.plan_eagerly(5) |> Workflow.react_until_satisfied() |> Workflow.raw_productions()
iex> 5 in results
true

Options

  • :name - Identifier for the accumulator (useful for referencing in rules)
  • :inputs / :outputs - Reserved for future schema-based type compatibility

Difference from Reduce

AccumulatorReduce
Single value per invocationAggregates over enumerables
State persists across invocationsOne-shot aggregation
For running totals/countersFor map-reduce patterns

Building State Machines

Connect rules to accumulators to create state-machine-like behavior:

require Runic
alias Runic.Workflow

counter = Runic.accumulator(0, fn x, acc -> acc + x end, name: :counter)
threshold_rule = Runic.rule(
  condition: fn {state, _input} -> state > 100 end,
  reaction: fn _ -> :threshold_exceeded end
)

Captured Variables

iex> require Runic
iex> alias Runic.Workflow
iex> multiplier = 2
iex> acc = Runic.accumulator(0, fn x, state -> state + x * ^multiplier end, name: :scaled_sum)
iex> acc.closure.bindings[:multiplier]
2

aggregate(opts, list)

(macro)

Creates an %Aggregate{}: a CQRS/ES aggregate that validates commands against current state, produces domain events, and folds events into state.

Compiles to an Accumulator (event fold) plus Rules (command handlers).

Usage

require Runic

agg = Runic.aggregate name: :counter do
  state 0

  command :increment do
    emit fn _state -> {:incremented, 1} end
  end

  command :decrement do
    where fn state -> state > 0 end
    emit fn _state -> {:decremented, 1} end
  end

  event {:incremented, n}, state do
    state + n
  end

  event {:decremented, n}, state do
    state - n
  end
end

Options

  • :name - Identifier for the aggregate (required)

DSL

  • state initial_value - Sets the initial aggregate state
  • command :name do ... end - Defines a command handler
    • where fn state -> bool end - Optional guard on current state
    • emit fn state -> event end - Produces domain events
  • event pattern, state_var do body end - Defines an event handler (reducer clause)

all_facts_of(component_name_or_hash)

Returns a list of all %Fact{} structs produced by a component across all invocations.

Unlike all_values_of/1, this returns full %Fact{} structs with metadata.

Examples

require Runic

Runic.rule name: :multi_fact_check do
  given(x: x)
  where(length(all_facts_of(:events)) > 0)
  then(fn %{x: x} -> {:has_events, x} end)
end

all_values_of(component_name_or_hash)

Returns a list of all raw values produced by a component across all invocations.

Useful for aggregation in where or then clauses, e.g. summing all scores.

Examples

In a where clause:

require Runic

Runic.rule name: :sum_check do
  given(x: x)
  where(Enum.sum(all_values_of(:scores)) > 100)
  then(fn %{x: x} -> {:high_score, x} end)
end

In a then clause:

require Runic

Runic.rule name: :sum_all_scores do
  given(x: _x)
  then(fn _bindings -> Enum.sum(all_values_of(:scores)) end)
end

condition(work)

(macro)

Creates a %Condition{}: a standalone conditional expression.

Conditions represent the left-hand side (predicate) of a rule. They can be reused across multiple rules when the same condition is expensive or shared.

Basic Usage

iex> require Runic
iex> cond = Runic.condition(fn x -> x > 10 end)
iex> cond.work.(15)
true
iex> cond.work.(5)
false

With Module Function Capture

iex> require Runic
iex> cond = Runic.condition({Kernel, :is_integer, 1})
iex> cond.work.(42)
true

With Name Option

iex> require Runic
iex> cond = Runic.condition(fn x -> x > 10 end, name: :big_number)
iex> cond.name
:big_number

Captured Variables with ^

Use the pin operator ^ to capture outer scope variables:

iex> require Runic
iex> threshold = 10
iex> cond = Runic.condition(fn x -> x > ^threshold end)
iex> cond.closure.bindings[:threshold]
10
iex> cond.work.(15)
true

Use Cases

  • Expensive checks: When a condition involves costly operations (e.g., API calls, database queries), define it once and reference it in multiple rules
  • Stateful conditions: Use state_of/1 to create conditions that depend on accumulator or state machine state
  • Reusability: Share predicates across rules for consistency

Note: Conditions should be pure and deterministic - they should not execute side effects.

condition(work, opts)

(macro)

context(key)

References an external runtime value by key inside Runic macros.

Used inside step, condition, rule, accumulator, map, and reduce macros to declare a dependency on a value provided via Workflow.put_run_context/2 or the :run_context option on react_until_satisfied/3.

Values are scoped by component name and resolved during the prepare phase. The _global key in run_context is merged into every component's context.

Resolves to nil when the key is not present in run_context. Use context/2 to provide a default instead.

Examples

require Runic
alias Runic.Workflow

step = Runic.step(fn _x -> context(:api_key) end, name: :call_llm)

rule =
  Runic.rule name: :gated do
    given(val: v)
    where(v > context(:threshold))
    then(fn %{val: v} -> {:ok, v} end)
  end

acc = Runic.accumulator(0, fn x, s -> s + x * context(:factor) end, name: :scaled)

workflow =
  Workflow.new()
  |> Workflow.add(step)
  |> Workflow.put_run_context(%{call_llm: %{api_key: "sk-..."}})

Dot access is supported for map-valued context keys:

Runic.step(fn x -> x + context(:config).pool_size end, name: :pooled)

context(key, opts)

References an external runtime value by key with a default fallback.

Behaves like context/1 but uses the provided default when the key is not present in run_context. The default can be a literal value or a zero-arity function that is called lazily when needed.

Keys with defaults are not reported as missing by Workflow.validate_run_context/2 and appear as {:optional, default} in Workflow.required_context_keys/1.

Defaults are embedded in the compiled closure and participate in content hashing — two components with different defaults produce different hashes.

Examples

require Runic

# Literal default
step = Runic.step(fn _x -> context(:model, default: "gpt-4") end, name: :call_llm)

# Function default — called lazily when key is missing
step = Runic.step(
  fn _x -> context(:api_key, default: fn -> System.get_env("API_KEY") end) end,
  name: :call_llm
)

# In rule where clauses
rule =
  Runic.rule name: :default_rule do
    given(val: v)
    where(v > context(:threshold, default: 100))
    then(fn %{val: v} -> {:over, v} end)
  end

# In accumulator reducers
acc = Runic.accumulator(0, fn x, s -> s + x * context(:factor, default: 1) end,
  name: :scaled
)

fact_count(component_name_or_hash)

Returns the number of facts produced by a given component in the workflow.

Used inside where clauses of rules to gate on how many facts a component has produced.

Examples

require Runic

Runic.rule name: :batch_ready do
  given(x: x)
  where(fact_count(:items) >= 3)
  then(fn %{x: x} -> {:process_batch, x} end)
end

fsm(opts \\ [], list)

(macro)

Creates a %FSM{}: a finite state machine with discrete states and guarded transitions.

FSMs compile to an Accumulator (holding the current state atom) plus Rules (one per transition, using state_of() to gate on current state). Entry actions are additional rules that fire on state changes.

Example

require Runic

fsm = Runic.fsm name: :traffic_light do
  initial_state :red

  state :red do
    on :timer, to: :green
    on :emergency, to: :red
    on_entry fn -> {:notify, :traffic_stopped} end
  end

  state :green do
    on :timer, to: :yellow
    on :emergency, to: :red
  end

  state :yellow do
    on :timer, to: :red
    on :emergency, to: :red
  end
end

Each transition compiles to a named Rule: :"fsm_name_from_state_on_event".

latest_fact_of(component_name_or_hash)

Returns the most recent %Fact{} struct produced by a component.

Unlike latest_value_of/1, this returns the full %Fact{} struct including metadata such as hash and ancestry, not just the raw value.

Examples

require Runic

Runic.rule name: :check_latest_fact do
  given(x: x)
  where(latest_fact_of(:processor) != nil)
  then(fn %{x: x} -> {:ok, x} end)
end

latest_value_of(component_name_or_hash)

Returns the most recent raw value produced by a component.

Useful in where clauses to compare against the latest output, or in then clauses to incorporate another component's latest result.

Examples

In a where clause:

require Runic

Runic.rule name: :high_temp_alert do
  given(x: x)
  where(latest_value_of(:sensor) > 100)
  then(fn %{x: x} -> {:alert, x} end)
end

In a then clause:

require Runic

Runic.rule name: :echo_latest do
  given(x: x)
  then(fn %{x: x} -> {:latest, x, latest_value_of(:sensor)} end)
end

map(expression, opts \\ [])

(macro)

Creates a %Map{}: applies a transformation to each element of an enumerable.

Map operations fan-out an enumerable input into individual elements, apply the transformation to each, and can be followed by a reduce to fan-in results.

Basic Usage

iex> require Runic
iex> alias Runic.Workflow
iex> map_op = Runic.map(fn x -> x * 2 end, name: :double)
iex> workflow = Workflow.new() |> Workflow.add(map_op)
iex> results = workflow |> Workflow.plan_eagerly([1, 2, 3]) |> Workflow.react_until_satisfied() |> Workflow.raw_productions()
iex> Enum.sort(results)
[2, 4, 6]

Options

  • :name - Identifier for referencing in reduce/3 via :map option
  • :inputs / :outputs - Reserved for future schema-based type compatibility

With Pipeline

Map can contain nested pipelines:

require Runic

Runic.map(
  {Runic.step(fn x -> x * 2 end, name: :double),
   [Runic.step(fn x -> x + 1 end, name: :add_one)]}
)

Map-Reduce Pattern

Connect a reduce to collect mapped results. The reduce's :map option links it to the upstream map:

iex> require Runic
iex> alias Runic.Workflow
iex> map_op = Runic.map(fn x -> x * 2 end, name: :double)
iex> reduce_op = Runic.reduce(0, fn x, acc -> x + acc end, name: :sum, map: :double)
iex> workflow = Workflow.new()
...>   |> Workflow.add(map_op)
...>   |> Workflow.add(reduce_op, to: :double)
iex> results = workflow
...>   |> Workflow.plan_eagerly([1, 2, 3])
...>   |> Workflow.react_until_satisfied()
...>   |> Workflow.raw_productions(:sum)
iex> 12 in results
true

How It Works

Internally, map uses a FanOut component that splits the enumerable into individual facts. Each element is processed independently, enabling parallel execution with the :async option on react/2.

process_manager(opts, list)

(macro)

Creates a %ProcessManager{}: a CQRS-oriented process manager that reacts to domain events, maintains coordination state, and emits commands.

Unlike Saga, ProcessManagers are event-driven and reactive rather than sequential. They subscribe to event patterns from multiple sources and decide what commands to issue based on accumulated state.

Compiles to an Accumulator (coordination state) plus Rules (event handlers).

Example

require Runic

pm = Runic.process_manager name: :fulfillment do
  state %{order_id: nil, paid: false, shipped: false}

  on {:order_submitted, order_id} do
    update %{order_id: order_id}
    emit {:charge_payment, order_id}
  end

  on {:payment_received, _} do
    update %{paid: true}
  end

  on {:shipment_created, _} do
    update %{shipped: true}
  end

  complete? fn state -> state.shipped end
end

DSL

  • state initial_value - Sets the initial process manager state
  • on event_pattern do ... end - Defines an event handler
    • update map - Merges updates into the process state
    • emit value - Produces a command fact as output
  • complete? fn state -> bool end - Completion check (fires when state satisfies predicate)
  • timeout :name, duration do ... end - Declares a timeout (scheduling is the Runner's responsibility)

Options

  • :name - Identifier for the process manager (required)

reduce(acc, reducer_fun, opts \\ [])

(macro)

Creates a %Reduce{}: aggregates multiple facts into a single accumulated result.

Reduce operations fan-in results from a map operation or process an enumerable from a parent step. Unlike accumulator/3 which processes single values cumulatively, reduce/3 aggregates over collections.

Basic Usage

Like Enum.reduce/3, process an enumerable from a parent step:

require Runic
alias Runic.Workflow

workflow = Runic.workflow(
  name: :sum_range,
  steps: [
    {Runic.step(fn -> [1, 2, 3, 4, 5] end, name: :generate),
     [Runic.reduce(0, fn x, acc -> x + acc end, name: :sum)]}
  ]
)

Options

  • :name - Identifier for the reduce component
  • :map - Name of an upstream map component for fan-in (lazy evaluation)
  • :inputs / :outputs - Reserved for future schema-based type compatibility

Map-Reduce Pattern (Lazy Evaluation)

When :map is specified, reduce waits for all mapped elements before aggregating. This enables lazy/parallel execution of the map phase:

iex> require Runic
iex> alias Runic.Workflow
iex> map_op = Runic.map(fn x -> x * 2 end, name: :double)
iex> reduce_op = Runic.reduce(0, fn x, acc -> x + acc end, name: :sum, map: :double)
iex> workflow = Workflow.new()
...>   |> Workflow.add(map_op)
...>   |> Workflow.add(reduce_op, to: :double)
iex> results = workflow
...>   |> Workflow.plan_eagerly([1, 2, 3])
...>   |> Workflow.react_until_satisfied()
...>   |> Workflow.raw_productions(:sum)
iex> 12 in results
true

Nested Pipeline with Reduce

Reduce can follow a pipeline after the map:

require Runic

Runic.workflow(
  steps: [
    {Runic.map(fn x -> x * 2 end, name: :double),
     [{Runic.step(fn x -> x + 1 end, name: :add_one),
       [Runic.reduce(0, fn x, acc -> x + acc end, name: :sum, map: :double)]}]}
  ]
)

Important Notes

  • Reduce operations are inherently sequential and cannot be parallelized unless your reducer has CRDT (commutative) properties
  • Without :map, reduce processes the enumerable eagerly in one invocation
  • With :map, reduce waits for all fan-out elements before reducing

rule(opts_or_block)

(macro)

Creates a %Rule{}: a conditional reaction for pattern-matched execution.

Rules have two phases: a condition (left-hand side) that must match, and a reaction (right-hand side) that executes when matched. This separation enables efficient evaluation of many rules together.

Basic Usage

iex> require Runic
iex> alias Runic.Workflow.Rule
iex> rule = Runic.rule(fn x when is_integer(x) and x > 0 -> :positive end)
iex> Rule.check(rule, 5)
true
iex> Rule.check(rule, -1)
false
iex> Rule.run(rule, 5)
:positive

Guard Clauses

Rules support full Elixir guard expressions:

iex> require Runic
iex> alias Runic.Workflow.Rule
iex> rule = Runic.rule(fn x when is_binary(x) and byte_size(x) > 5 -> :long_string end)
iex> Rule.check(rule, "hello!")
true
iex> Rule.check(rule, "hi")
false

Pattern Matching

iex> require Runic
iex> alias Runic.Workflow.Rule
iex> rule = Runic.rule(fn %{status: :pending} -> :process end)
iex> Rule.check(rule, %{status: :pending, id: 1})
true
iex> Rule.check(rule, %{status: :done})
false

Separated Condition and Reaction

For expensive conditions shared by multiple rules, or clearer organization:

iex> require Runic
iex> alias Runic.Workflow.Rule
iex> rule = Runic.rule(
...>   name: :expensive_check,
...>   condition: fn x -> rem(x, 2) == 0 end,
...>   reaction: fn x -> x * 2 end
...> )
iex> Rule.check(rule, 4)
true
iex> Rule.run(rule, 4)
8

Also supports :if / :do aliases:

Runic.rule(
  name: :my_rule,
  if: fn x -> x > 10 end,
  do: fn x -> :large end
)

Multi-Arity Rules

Rules can require multiple inputs (provided as a list):

iex> require Runic
iex> alias Runic.Workflow.Rule
iex> rule = Runic.rule(fn a, b when is_integer(a) and is_integer(b) -> a + b end)
iex> Rule.check(rule, [3, 4])
true
iex> Rule.run(rule, [3, 4])
7

In Workflows

Rules are evaluated in the planning/match phase before execution:

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(
...>   name: :classifier,
...>   rules: [
...>     Runic.rule(fn x when x > 100 -> :xlarge end, name: :xlarge),
...>     Runic.rule(fn x when x > 10 and x <= 100 -> :large end, name: :large),
...>     Runic.rule(fn x when x > 0 and x <= 10 -> :small end, name: :small)
...>   ]
...> )
iex> workflow |> Workflow.plan_eagerly(50) |> Workflow.react_until_satisfied() |> Workflow.raw_productions()
[:large]

Given/Where/Then DSL

For complex rules with pattern destructuring, use the explicit DSL with three clauses:

  • given — a pattern-matching clause that destructures the input and binds variables. The binding name becomes the key in the bindings map passed to then.
  • where — a boolean expression evaluated at runtime. Unlike when guards, where supports any Elixir expression including function calls like String.starts_with?/2, Enum.member?/2, etc.
  • then — a function that receives the bindings map and produces a result. The bindings from given are available as keys.

Note: Use where instead of when because when is a reserved Elixir keyword.

Do/End Block Form

require Runic

Runic.rule do
  given(order: %{status: status, total: total})
  where(status == :pending and total > 100)
  then(fn %{order: order, total: total} -> {:apply_discount, order, total * 0.9} end)
end

Named Do/End Block Form

Pass options before the do block to name the rule:

require Runic

Runic.rule name: :premium_discount do
  given(order: %{customer: %{tier: tier}, total: total})
  where(tier == :premium and total > 50)
  then(fn %{order: order} -> {:apply_discount, order} end)
end

Keyword Form

The same rule can be written as a keyword list:

require Runic

Runic.rule(
  name: :threshold_check,
  given: [value: v],
  where: v > 100,
  then: fn %{value: v} -> {:over_threshold, v} end
)

Direct map patterns are also supported in the keyword form:

Runic.rule(
  name: :map_pattern,
  given: %{x: x, y: y},
  where: x + y > 10,
  then: fn %{x: x, y: y} -> {:sum, x + y} end
)

Non-Guard Expressions in where

Unlike when guards, where supports any boolean expression:

require Runic

Runic.rule do
  given(name: name)
  where(String.starts_with?(name, "prefix_"))
  then(fn %{name: n} -> {:matched, n} end)
end

Capturing External Variables with ^

Use the pin operator ^ to capture variables from the surrounding scope. This is essential when dynamically constructing rules in loops or functions:

require Runic
alias Runic.Workflow

threshold = 100

rule =
  Runic.rule do
    given(value: v)
    where(v > ^threshold)
    then(fn %{value: v} -> {:over_threshold, v} end)
  end

Workflow.new()
|> Workflow.add(rule)
|> Workflow.react_until_satisfied(150)
|> Workflow.raw_productions()
# => [{:over_threshold, 150}]

The ^ pin also works in the keyword form and in condition/reaction style:

some_values = [:potato, :ham, :tomato]

Runic.rule(
  name: "escaped rule",
  condition: fn val when is_atom(val) -> true end,
  reaction: fn val ->
    Enum.map(^some_values, fn x -> {val, x} end)
  end
)

rule(opts, opts)

(macro)

saga(opts \\ [], list)

(macro)

Creates a %Saga{}: a sequence of transaction steps with compensating actions.

Sagas are explicit forward-then-compensate pipelines. Each transaction step must have a corresponding compensate block. On failure, completed steps are compensated in reverse order.

Compiles to an Accumulator (tracking saga state), forward Rules (one per transaction step), and compensation Rules (one per compensate block).

Usage

require Runic

saga = Runic.saga name: :fulfillment do
  transaction :reserve_inventory do
    fn _input -> {:ok, :reserved} end
  end
  compensate :reserve_inventory do
    fn %{reserve_inventory: _} -> :released end
  end

  transaction :charge_payment do
    fn %{reserve_inventory: _} -> {:ok, :charged} end
  end
  compensate :charge_payment do
    fn %{charge_payment: _} -> :refunded end
  end

  on_complete fn results -> {:saga_completed, results} end
  on_abort fn reason, compensated -> {:saga_aborted, reason, compensated} end
end

state_machine(opts)

(macro)

Creates a %StateMachine{}: stateful workflows with reducers and conditional reactors.

State machines combine an accumulator with rules that react to state changes. The reducer processes inputs in context of accumulated state, and reactors conditionally execute based on the new state.

Basic Usage

iex> require Runic
iex> alias Runic.Workflow
iex> counter = Runic.state_machine(
...>   name: :counter,
...>   init: 0,
...>   reducer: fn x, acc -> acc + x end
...> )
iex> workflow = Workflow.new() |> Workflow.add(counter)
iex> results = workflow |> Workflow.plan_eagerly(5) |> Workflow.react_until_satisfied() |> Workflow.raw_productions()
iex> Enum.sort(results)
[0, 5]

Options

  • :name - Identifier for the state machine (required for referencing)
  • :init - Initial state (literal value, function, or {M, F, A} tuple)
  • :reducer - Function (input, state) -> new_state for state transitions
  • :reactors - List of rules that react to state changes
  • :inputs / :outputs - Reserved for future schema-based type compatibility

With Reactors

Reactors are rules that fire when the accumulated state matches their conditions:

require Runic
alias Runic.Workflow

threshold_sm = Runic.state_machine(
  name: :threshold_monitor,
  init: 0,
  reducer: fn x, acc -> acc + x end,
  reactors: [
    fn state when state > 100 -> :threshold_exceeded end,
    fn state when state > 50 -> :warning end
  ]
)

Lock/Unlock Example

require Runic

lock = Runic.state_machine(
  name: :lock,
  init: %{code: "secret", state: :locked},
  reducer: fn
    :lock, state ->
      %{state | state: :locked}
    {:unlock, code}, %{code: code, state: :locked} = state ->
      %{state | state: :unlocked}
    _, state ->
      state
  end,
  reactors: [
    fn %{state: :unlocked} -> :access_granted end,
    fn %{state: :locked} -> :access_denied end
  ]
)

Block DSL with handle/react (Form 2)

For state machines with complex state and event-driven transitions, the block DSL provides a more expressive form. Each handle clause bundles an event match, input pattern, state binding, and state transformation into a named, addressable sub-component. react clauses observe state without modifying it.

Runic.state_machine name: :cart, init: %{items: [], total: 0} do
  handle :add_item, %{item: item}, state do
    %{state | items: [item | state.items], total: state.total + item.price}
  end

  handle :checkout, _, state when state.items != [] do
    %{state | status: :checked_out}
  end

  react :high_value do
    fn %{total: t} when t > 1000 -> {:vip_alert, t} end
  end
end

handle clause semantics

handle event_pattern, input_match, state_var [when state_guard] do
  body  # must return next state
end
  • event_pattern — atom or pattern matched against the incoming fact's event type discriminator.
  • input_match — pattern match on the event payload / fact value.
  • state_var — binds the current state via state_of(:sm_name) meta_ref.
  • when state_guard — optional guard on current state.
  • body — returns the next state value, fed to the accumulator.

Each handle compiles to a named Rule: :"<sm_name>_<event_pattern>" (e.g., :cart_add_item).

react clause semantics

react name do
  fn state_pattern -> output end
end
  • Name is explicitly required (the atom after react).
  • Compiles to a Rule with a state_of() condition and a step that produces an output fact.
  • Does not modify state — observation only.

Both forms produce identical %StateMachine{} structs. The handle block is sugar for splitting a multi-clause reducer into individually named rules.

Captured Variables

Use ^ for runtime values in reducers and reactors:

multiplier = 2
Runic.state_machine(
  name: :scaled_sum,
  init: 0,
  reducer: fn x, acc -> acc + x * ^multiplier end
)

state_of(component_name_or_hash)

Used inside Runic macros such as rules to reference the state of another component such as an accumulator or reduce.

Expands in conjunction with the rest of the expression of the rule's expression to evaluate against the last known state of the component.

Examples

require Runic
alias Runic.Workflow

counter = Runic.accumulator(0, fn x, acc -> acc + x end, name: :counter)

threshold_rule =
  Runic.rule name: :threshold_check do
    given(x: x)
    where(state_of(:counter) > 5)
    then(fn %{x: x} -> {:above_threshold, x} end)
  end

workflow =
  Workflow.new()
  |> Workflow.add(counter)
  |> Workflow.add(threshold_rule, to: :counter)

step(work)

(macro)

Creates a %Step{}: a basic lambda expression that can be added to a workflow.

Steps are the fundamental building blocks of Runic workflows, representing input → output transformations. Each step wraps a function and can be composed with other steps to form data processing pipelines.

Basic Usage

iex> require Runic
iex> step = Runic.step(fn x -> x * 2 end)
iex> step.work.(5)
10

Arities

Steps support 0, 1, or 2-arity functions:

iex> require Runic
iex> zero_arity = Runic.step(fn -> 42 end)
iex> zero_arity.work.()
42

iex> require Runic
iex> one_arity = Runic.step(fn x -> x + 1 end)
iex> one_arity.work.(10)
11

iex> require Runic
iex> two_arity = Runic.step(fn a, b -> a + b end)
iex> two_arity.work.(3, 4)
7

Note: 2-arity steps only execute when the workflow receives a 2-element list as input.

Captured Variables with ^

Use the pin operator ^ to capture outer scope variables. This is essential for:

  • Content-addressable hashing (each bound value produces unique hashes)

  • Serialization with build_log/1 and recovery with from_log/1

  • Dynamic workflow construction at runtime

    iex> require Runic iex> multiplier = 3 iex> step = Runic.step(fn x -> x * ^multiplier end) iex> step.closure.bindings[:multiplier] 3 iex> step.work.(10) 30

Without ^, Elixir's normal closure mechanism captures the variable, but Runic cannot track, hash, or serialize it - the workflow will fail after persistence.

Captured Functions

Steps can wrap module functions using capture syntax:

iex> require Runic
iex> step = Runic.step(&String.upcase/1)
iex> step.work.("hello")
"HELLO"

Options

  • :name - An atom or string identifier for referencing this step in workflows

  • :work - The function to execute (alternative to passing as first argument)

  • :inputs - Reserved for future schema-based type compatibility

  • :outputs - Reserved for future schema-based type compatibility

    iex> require Runic iex> step = Runic.step(fn x -> x * 2 end, name: :doubler) iex> step.name :doubler

    iex> require Runic iex> step = Runic.step(name: :tripler, work: fn x -> x * 3 end) iex> step.name :tripler

In Workflows

Steps can be connected in pipelines using the workflow DSL:

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(
...>   name: "pipeline",
...>   steps: [
...>     {Runic.step(fn x -> x + 1 end, name: :add_one),
...>      [Runic.step(fn x -> x * 2 end, name: :double)]}
...>   ]
...> )
iex> results = workflow |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions()
iex> Enum.sort(results)
[6, 12]

step(work, opts)

(macro)

step_ran?(component_name_or_hash)

Evaluates to true in a condition if the specified step has ever been ran.

Note that this evaluates to true globally for any prior execution of the workflow, not just within the current invocation.

Examples

require Runic
alias Runic.Workflow

rule =
  Runic.rule name: :after_validation do
    given(x: x)
    where(step_ran?(:validator))
    then(fn %{x: x} -> {:validated, x} end)
  end

step_ran?(component_name_or_hash, fact_or_hash)

Evaluates to true if the specified step has been executed for the given input fact.

Considers only input facts for a generation of invokations fed into the root of the workflow.

Examples

require Runic
alias Runic.Workflow

rule =
  Runic.rule name: :scoped_check do
    given(x: x)
    where(step_ran?(:validator, x))
    then(fn %{x: x} -> {:validated, x} end)
  end

transmute(component)

Converts a Runic component into a %Workflow{} via the Transmutable protocol.

Components like steps, rules, state machines, etc. can be transmuted into standalone workflows for evaluation or composition.

Examples

iex> require Runic
iex> step = Runic.step(fn x -> x * 2 end)
iex> workflow = Runic.transmute(step)
iex> workflow.__struct__
Runic.Workflow

iex> require Runic
iex> rule = Runic.rule(fn x when x > 0 -> :positive end)
iex> workflow = Runic.transmute(rule)
iex> workflow.__struct__
Runic.Workflow

Use Cases

  • Preparing components for standalone evaluation
  • Converting natural representations (e.g., a single rule) into evaluable workflows
  • Composing heterogeneous components by first converting to workflows, then merging

workflow(opts \\ [])

Creates a %Workflow{} from component options.

Workflows are directed acyclic graphs (DAGs) of steps, rules, and other components connected through dataflow semantics. They enable lazy or eager evaluation and can be composed, persisted, and distributed.

Basic Usage

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(
...>   name: :simple,
...>   steps: [Runic.step(fn x -> x * 2 end)]
...> )
iex> workflow |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions()
[10]

Options

  • :name - Identifier for the workflow (atom or string)
  • :steps - List of steps, with optional pipeline syntax for parent-child relationships
  • :rules - List of conditional rules to add
  • :before_hooks - Debug hooks called before step execution
  • :after_hooks - Debug hooks called after step execution
  • :input_ports - Port contract for workflow boundary inputs (keyword list of port schemas)
  • :output_ports - Port contract for workflow boundary outputs (keyword list of port schemas)

Pipeline Syntax

Use tuples {parent, [children]} to define step dependencies:

iex> require Runic
iex> alias Runic.Workflow
iex> pipeline = Runic.workflow(
...>   name: :pipeline,
...>   steps: [
...>     {Runic.step(fn x -> x + 1 end, name: :add_one),
...>      [Runic.step(fn x -> x * 2 end, name: :double),
...>       Runic.step(fn x -> x * 3 end, name: :triple)]}
...>   ]
...> )
iex> results = pipeline |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions()
iex> Enum.sort(results)
[6, 12, 18]

With Rules

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(
...>   name: :rule_example,
...>   rules: [
...>     Runic.rule(fn x when is_integer(x) and x > 10 -> :large end),
...>     Runic.rule(fn x when is_integer(x) and x <= 10 -> :small end)
...>   ]
...> )
iex> workflow |> Workflow.plan_eagerly(15) |> Workflow.react_until_satisfied() |> Workflow.raw_productions()
[:large]

Hooks

Hooks receive (step, workflow, fact) and must return the workflow. Use them for debugging, logging, or dynamic workflow modification:

require Runic
alias Runic.Workflow

Runic.workflow(
  name: :with_hooks,
  steps: [Runic.step(fn x -> x * 2 end, name: :double)],
  after_hooks: [
    double: [
      fn _step, workflow, fact ->
        IO.puts("Produced: #{inspect(fact.value)}")
        workflow
      end
    ]
  ]
)

Boundary Ports

Declare input and output ports to make a workflow composable as a typed component. Workflows without ports return empty contracts and are connectable to anything.

require Runic
alias Runic.Workflow

workflow = Runic.workflow(
  name: :price_calculator,
  steps: [
    {Runic.step(fn order -> order.items end, name: :parse_order),
     [Runic.step(fn items -> Enum.sum(Enum.map(items, & &1.price)) end, name: :calculate_total)]}
  ],
  input_ports: [
    order: [type: :map, doc: "Order to price", to: :parse_order]
  ],
  output_ports: [
    total: [type: :float, doc: "Calculated total", from: :calculate_total]
  ]
)

# Ports are surfaced via the Component protocol
Runic.Component.inputs(workflow)
# => [order: [type: :map, doc: "Order to price", to: :parse_order]]

Port options: :type, :doc, :cardinality, :required, :to (input binding), :from (output binding). The :to and :from options reference internal component names and are validated at build time.