Runic.Workflow (Runic v0.1.0-alpha.7)

Copy Markdown View Source

Runtime evaluation engine for Runic workflows.

Runic Workflows are used to compose many branching steps, rules and accumulations/reductions at runtime for lazy or eager evaluation. You can think of Runic Workflows as a recipe of rules that when fed a stream of facts may react.

Quick Start

require Runic
alias Runic.Workflow

workflow = Runic.workflow(
  steps: [
    {Runic.step(fn x -> x + 1 end, name: :add),
     [Runic.step(fn x -> x * 2 end, name: :double)]}
  ]
)

workflow
|> Workflow.react_until_satisfied(5)
|> Workflow.raw_productions()
# => [12]

Three-Phase Execution Model

All workflow evaluation uses a three-phase execution model that enables parallel execution and external scheduler integration:

  1. Prepare - Extract minimal context from the workflow into %Runnable{} structs
  2. Execute - Run node work functions in isolation (can be parallelized)
  3. Apply - Reduce results back into the workflow

Basic Execution

For simple use cases, use react/2 for a single cycle or react_until_satisfied/3 to run to completion:

# Single cycle
workflow = Workflow.react(workflow, input)

# Run to completion (recommended for simple use)
workflow = Workflow.react_until_satisfied(workflow, input)

Parallel Execution

Enable parallel execution for I/O-bound or CPU-intensive workflows:

workflow = Workflow.react_until_satisfied(workflow, input,
  async: true,
  max_concurrency: 8,
  timeout: :infinity
)

External Scheduler Integration

For custom schedulers, worker pools, or distributed execution, use the low-level three-phase APIs directly:

# Phase 1: Prepare runnables for dispatch
workflow = Workflow.plan_eagerly(workflow, input)
{workflow, runnables} = Workflow.prepare_for_dispatch(workflow)

# Phase 2: Execute (dispatch to worker pool, external service, etc.)
executed = Task.async_stream(runnables, fn runnable ->
  Runic.Workflow.Invokable.execute(runnable.node, runnable)
end, timeout: :infinity)

# Phase 3: Apply results back to workflow
workflow = Enum.reduce(executed, workflow, fn {:ok, runnable}, wrk ->
  Workflow.apply_runnable(wrk, runnable)
end)

# Continue if more work is available
if Workflow.is_runnable?(workflow), do: # repeat...

Key APIs for external scheduling:

  • prepare_for_dispatch/1 - Returns {workflow, [%Runnable{}]} for dispatch
  • apply_runnable/2 - Applies a completed runnable back to the workflow
  • Invokable.execute/2 - Executes a runnable in isolation (no workflow access)

Runtime Context

Runtime context provides a way to inject external, runtime-scoped values (API keys, database connections, tenant IDs, feature flags) into workflow components without baking them into closures or the workflow graph.

Components declare their context dependencies using context/1 expressions in the Runic DSL:

step = Runic.step(fn _x -> context(:api_key) end, name: :call_llm)

rule = Runic.rule name: :gated do
  given(val: v)
  where(v > context(:threshold))
  then(fn %{val: v} -> {:ok, v} end)
end

Context is provided at runtime via put_run_context/2 or the :run_context option:

# Set context directly
workflow = Workflow.put_run_context(workflow, %{
  call_llm: %{api_key: "sk-..."},
  _global: %{workspace_id: "ws1"}
})

# Or pass via options
Workflow.react_until_satisfied(workflow, input,
  run_context: %{call_llm: %{api_key: "sk-..."}}
)

Context values are:

  • Scoped by component name with an optional _global key for shared values
  • Not part of the workflow hash — two workflows with different contexts are structurally identical
  • Not serialized in the event log or fact graph
  • Resolved during the prepare phase of the three-phase execution model

Use required_context_keys/1 and validate_run_context/2 to introspect and validate context requirements before execution.

Workflow Composition

Workflows can be composed together using merge/2 or by adding components with add/3:

# Merge two workflows
combined = Workflow.merge(workflow1, workflow2)

# Add components dynamically
workflow = Workflow.new()
  |> Workflow.add(step1)
  |> Workflow.add(step2, to: :step1)
  |> Workflow.add(join_step, to: [:branch_a, :branch_b])

Any component implementing the Runic.Transmutable protocol can be merged into a workflow.

Introspection APIs

Query workflow structure and state:

# List all components by type
Workflow.steps(workflow)       # All Step structs
Workflow.conditions(workflow)  # All Condition structs

# Get components by name
Workflow.get_component(workflow, :my_step)

# Query execution state
Workflow.is_runnable?(workflow)     # Any work pending?
Workflow.next_runnables(workflow)   # List of {node, fact} pairs

# Traverse workflow structure
Workflow.next_steps(workflow, parent_step)  # Children of a step

Result Extraction

Extract results after workflow execution:

# Structured results using output port contract (recommended)
Workflow.results(workflow)                       # => %{total: 42.50}

# Explicit component selection
Workflow.results(workflow, [:add, :mult])         # => %{add: 6, mult: 10}

# With options
Workflow.results(workflow, [:price], facts: true) # => %{price: %Fact{}}
Workflow.results(workflow, nil, all: true)         # => %{total: [v1, v2]}

# Raw values (low-level)
Workflow.raw_productions(workflow)           # All leaf outputs
Workflow.raw_productions(workflow, :step_name)  # From specific component

# Full Fact structs with ancestry
Workflow.productions(workflow)

# All facts including inputs
Workflow.facts(workflow)

Serialization

Serialize workflows for persistence and visualization:

# Build log for persistence
log = Workflow.build_log(workflow)
serialized = :erlang.term_to_binary(log)

# Rebuild from log
workflow = Workflow.from_log(:erlang.binary_to_term(serialized))

# Visualization formats
Workflow.to_mermaid(workflow)      # Mermaid flowchart
Workflow.to_dot(workflow)          # Graphviz DOT format
Workflow.to_cytoscape(workflow)    # Cytoscape.js JSON
Workflow.to_edgelist(workflow)     # Edge list tuples

When to Use Runic Workflows

Runic Workflows are intended for use cases where your program is built or modified at runtime. They are useful for:

  • Complex data-dependent pipelines
  • Expert systems and rule engines
  • User-defined logical systems (low-code tools, DSLs)
  • Dynamic workflow composition at runtime

If your model can be expressed in advance with compiled code using the usual control flow and concurrency tools available in Elixir/Erlang, Runic Workflows may not be necessary. There are performance trade-offs of doing more compilation and evaluation at runtime.

See the Cheatsheet and Usage Rules guides for more.

Summary

Functions

Adds a component to the workflow, connecting it to the parent step or root if no parent is specified.

Adds a list of rules to the workflow root.

Prepends a {matcher, policy} rule to the workflow's scheduler policies (higher priority).

Adds a step to the root of the workflow that is always evaluated with a new fact.

Adds a dependent step to some other step in a workflow by name.

Adds a batch of steps to the workflow, supporting pipelines and joins.

Adds a component to the workflow and returns the updated workflow along with the %ComponentAdded{} events produced.

Computes the causal depth of a fact by walking its ancestry chain.

Appends runnable lifecycle events to the workflow's event accumulator.

Appends a {matcher, policy} rule to the workflow's scheduler policies (lower priority).

Applies a single %ComponentAdded{} event to the workflow, adding the component described by the event.

Applies a list of %ComponentAdded{} events to the workflow.

Applies a list of hook apply functions to the workflow.

Applies a completed runnable back to the workflow.

Attaches a hook function to be run after a given component step.

Attaches a hook function to be run before a given component step.

Builds a getter function for a meta reference based on its kind.

Returns a list of %ComponentAdded{} events for serialization and recovery.

Returns the causal depth of a fact by walking its ancestry chain.

Returns a map of all registered components in the workflow by the registered component name.

Lists all %Condition{} structs in the workflow.

Checks whether a component can be connected at a given point in the workflow.

Returns a list of components in the workflow graph that are compatible for connection with the given component.

Returns a graph containing only the registered components as vertices and :connects_to edges showing how components are connected to each other.

Disables event emission on the workflow.

Creates a :meta_ref edge from a node to its meta expression target.

Enables event emission on the workflow.

Returns a complete event snapshot for the workflow.

Returns all %ReactionOccurred{} events caused since the given fact.

Executes the Invokable protocol for runnable.

Executes a list of runnables with the given scheduler policies.

Returns all facts in the workflow, including inputs and productions.

Retrieves a component from the workflow by name, returning an ok/error tuple.

Rebuilds a workflow from a mixed stream of build and runtime events.

Like from_events/2 but accepts options for replay control.

Rebuilds a workflow from a list of %ComponentAdded{} and/or %ReactionOccurred{} events.

Retrieves a component from the workflow by name.

Retrieves a component from the workflow by name, raising if not found.

Gets the hooks for a given node hash.

Returns the full run context map.

Returns the resolved run context for a specific component.

Executes the Invokable protocol for a runnable step and fact using the three-phase model.

Executes the Invokable protocol for a runnable step and fact and returns all newly caused events produced by the invokation.

Returns true if the workflow has pending work (runnable or matchable nodes).

log(wrk) deprecated

Returns the complete event log combining build_log/1 and reaction events.

Merges the second workflow into the first, maintaining the name of the first.

Returns the list of components that a node depends on via :meta_ref edges.

Returns the list of nodes that depend on a component via :meta_ref edges.

Creates an empty workflow with no components.

Constructs a new Runic Workflow with the given name or parameters.

Returns a list of {node, fact} pairs ready for activation in the next cycle.

Returns the child steps connected via dataflow edges from a parent step.

Identifies dispatched-but-not-completed runnables from the workflow's runnable events.

plan/1 will, for all next left hand side / match phase runnables activate and prepare next match runnables.

For a new set of inputs, plan/2 prepares the workflow agenda for the next set of reactions by matching through left-hand-side conditions in the workflow network.

Eagerly plans through all produced facts in the workflow that haven't yet activated subsequent runnables.

Invokes all left hand side / match-phase runnables in the workflow for a given input fact until all are satisfied.

Prepares all available runnables for external dispatch.

Prepares meta context for a node by traversing its :meta_ref edges.

Returns a list of prepared %Runnable{} structs ready for execution.

Returns all %Fact{} structs produced by the workflow.

Returns all productions of a component or sub component by name.

Returns all facts produced in the workflow so far by component name and sub component.

Removes all %Fact{} vertices and generation integers from the workflow graph.

Merges the given context map into the workflow's run context.

Returns the raw values from all produced facts.

Returns a map of component name to raw production values for all components.

Returns raw (output value) side effects of the workflow - i.e. facts resulting from the execution of a Runic.Step

Executes a single reaction cycle using the three-phase model.

Executes a single reaction cycle with the given input value.

Executes the workflow until no more runnables remain.

Returns raw (output value) side effects of the workflow - i.e. facts resulting from the execution of a Runic.Step

Removes a component and its owned invokable nodes from the workflow.

Returns a map of component names to their context key requirements.

Extracts structured results from a workflow.

Finds the root ancestor fact hash for a given fact.

Replaces the workflow's scheduler policies list entirely.

Marks all nodes transitively downstream of failed_node as unreachable.

Lists all %Step{} structs in the workflow.

Returns a keyword list of sub-components of the given component by kind.

Serializes the workflow to Cytoscape.js element JSON format.

Serializes the workflow to DOT (Graphviz) format.

Serializes the workflow to an edgelist format.

Serializes the workflow to Mermaid flowchart format.

Serializes causal reactions as a Mermaid sequence diagram.

Validates that the given run_context satisfies all context/1 references in the workflow.

Types

runnable()

@type runnable() :: {fun(), term()}

t()

@type t() :: %Runic.Workflow{
  after_hooks: map(),
  before_hooks: map(),
  build_log: term(),
  components: map(),
  emit_events: boolean(),
  graph: Multigraph.t(),
  hash: binary(),
  input_ports: keyword() | nil,
  inputs: map(),
  mapped: map(),
  name: String.t(),
  output_ports: keyword() | nil,
  run_context: map(),
  runnable_events: list(),
  scheduler_policies: list(),
  uncommitted_events: term()
}

Functions

add(workflow, component, opts \\ [])

Adds a component to the workflow, connecting it to the parent step or root if no parent is specified.

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> step = Runic.step(fn x -> x + 1 end, name: :add_one)
iex> workflow = Workflow.new() |> Workflow.add(step)
iex> Workflow.get_component(workflow, :add_one) |> Map.get(:name)
:add_one

iex> require Runic
iex> alias Runic.Workflow
iex> s1 = Runic.step(fn x -> x + 1 end, name: :first)
iex> s2 = Runic.step(fn x -> x * 2 end, name: :second)
iex> workflow = Workflow.new() |> Workflow.add(s1) |> Workflow.add(s2, to: :first)
iex> workflow |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions() |> Enum.sort()
[6, 12]

When :to is a list of parent names, a Join is created so the dependent step waits for all parents to produce facts before running:

require Runic
alias Runic.Workflow

a_step = Runic.step(fn x -> x + 1 end, name: :a)
b_step = Runic.step(fn x -> x * 2 end, name: :b)
sum_step = Runic.step(fn a, b -> a + b end, name: :sum)

workflow =
  Workflow.new()
  |> Workflow.add(a_step)
  |> Workflow.add(b_step)
  |> Workflow.add(sum_step, to: [:a, :b])

result =
  workflow
  |> Workflow.react_until_satisfied(5)
  |> Workflow.raw_reactions()

6 in result  # :a produced 5 + 1
10 in result # :b produced 5 * 2
16 in result # :sum produced 6 + 10

Port Validation

By default, add/3 validates that the producer's output ports are type-compatible with the consumer's input ports. Control this with the :validate option:

  • :error (default) — raises Runic.IncompatiblePortError on type mismatch

  • :warn — logs a warning but allows the connection

  • :off — skips validation entirely (useful for prototyping)

    # Bypass validation during prototyping Workflow.add(workflow, step, to: :parent, validate: :off)

Untyped components (all ports default to type: :any) always pass validation, preserving Runic's gradual typing philosophy.

add_rules(workflow, rules)

Adds a list of rules to the workflow root.

Each rule is added via add/2. Passing nil is a no-op.

Example

require Runic
alias Runic.Workflow

rules = [
  Runic.rule(fn x when x > 0 -> :positive end, name: :pos),
  Runic.rule(fn x when x < 0 -> :negative end, name: :neg)
]

workflow = Workflow.new() |> Workflow.add_rules(rules)

add_scheduler_policy(workflow, matcher, policy)

@spec add_scheduler_policy(t(), term(), map()) :: t()

Prepends a {matcher, policy} rule to the workflow's scheduler policies (higher priority).

add_step(workflow, child_step)

Adds a step to the root of the workflow that is always evaluated with a new fact.

add_step(workflow, parent_step, child_step)

Adds a dependent step to some other step in a workflow by name.

The dependent step is fed signed facts produced by the parent step during a reaction.

Adding dependent steps is the most low-level way of building a dataflow execution graph as it assumes no conditional, branching logic.

If you're just building a pipeline, dependent steps can be sufficient, however you might want Rules for conditional branching logic.

add_steps(workflow, steps)

Adds a batch of steps to the workflow, supporting pipelines and joins.

Accepts a list where each element is one of:

  • %Step{} — added directly to the workflow root
  • {%Step{}, dependent_steps} — a pipeline: the parent step is added to root, then dependent steps are connected downstream
  • {[%Step{}, ...], dependent_steps} — multiple parent steps joined: all parents are added to root, a Join node is created, and dependents follow the join

Passing nil is a no-op.

Example

require Runic
alias Runic.Workflow

workflow = Workflow.new() |> Workflow.add_steps([
  {Runic.step(fn x -> x + 1 end, name: :add),
   [Runic.step(fn x -> x * 2 end, name: :double)]}
])

add_with_events(workflow, component, opts \\ [])

Adds a component to the workflow and returns the updated workflow along with the %ComponentAdded{} events produced.

This is useful for event-sourced workflow construction where you need to capture the events for later replay via apply_events/2.

Example

require Runic
alias Runic.Workflow

step = Runic.step(fn x -> x + 1 end, name: :add_one)
{workflow, events} = Workflow.add_with_events(Workflow.new(), step)

# Events can rebuild the same workflow
rebuilt = Workflow.apply_events(Workflow.new(), events)

ancestry_depth(workflow, arg2)

@spec ancestry_depth(t(), Runic.Workflow.Fact.t() | Runic.Workflow.FactRef.t()) ::
  non_neg_integer()

Computes the causal depth of a fact by walking its ancestry chain.

Replaces generation counter for causal ordering. A fact with no ancestry (root input) has depth 0. Each causal step adds 1 to the depth.

Examples

iex> ancestry_depth(workflow, root_fact)
0

iex> ancestry_depth(workflow, fact_after_two_steps)
2

append_runnable_events(workflow, events)

@spec append_runnable_events(t(), list()) :: t()

Appends runnable lifecycle events to the workflow's event accumulator.

Used by schedulers to record %RunnableDispatched{}, %RunnableCompleted{}, and %RunnableFailed{} events produced by PolicyDriver.execute/3 with emit_events: true.

append_scheduler_policy(workflow, matcher, policy)

@spec append_scheduler_policy(t(), term(), map()) :: t()

Appends a {matcher, policy} rule to the workflow's scheduler policies (lower priority).

apply_event(wrk, event)

Applies a single %ComponentAdded{} event to the workflow, adding the component described by the event.

Part of the event sourcing system — use with events captured from add_with_events/2 or build_log/1 to reconstruct a workflow incrementally.

apply_events(wrk, events)

Applies a list of %ComponentAdded{} events to the workflow.

Batch version of apply_event/2. Events are applied in order via Enum.reduce/3.

Example

require Runic
alias Runic.Workflow

step1 = Runic.step(fn x -> x + 1 end, name: :add_one)
step2 = Runic.step(fn x -> x * 2 end, name: :double)

{workflow, events1} = Workflow.add_with_events(Workflow.new(), step1)
{_workflow, events2} = Workflow.add_with_events(workflow, step2, to: :add_one)

rebuilt = Workflow.apply_events(Workflow.new(), events1 ++ events2)

apply_hook_fns(workflow, apply_fns)

@spec apply_hook_fns(t(), [function()]) :: t()

Applies a list of hook apply functions to the workflow.

This is used during the apply phase to execute deferred workflow modifications returned by hooks during the execute phase.

Example

workflow
|> Workflow.apply_hook_fns(before_apply_fns)
|> do_main_apply_logic()
|> Workflow.apply_hook_fns(after_apply_fns)

apply_runnable(workflow, runnable)

@spec apply_runnable(t(), Runic.Workflow.Runnable.t()) :: t()

Applies a completed runnable back to the workflow.

Called by schedulers after receiving execution results.

Events from the runnable are folded via apply_event/2, hook apply_fns are run, and downstream nodes are activated.

Parameters

  • workflow - The current workflow state
  • runnable - A runnable with status :completed, :failed, :skipped, or :pending

Returns

Updated workflow with the runnable's effects applied.

Example

executed = Invokable.execute(runnable.node, runnable)
workflow = Workflow.apply_runnable(workflow, executed)

attach_after_hook(workflow, component_name, hook)

Attaches a hook function to be run after a given component step.

Examples

workflow
|> Workflow.attach_after_hook("my_component", fn step, workflow, output_fact ->
  IO.inspect(output_fact, label: "Output fact")
  IO.inspect(step, label: "Step")
  workflow
end)

attach_before_hook(workflow, component_name, hook)

Attaches a hook function to be run before a given component step.

The hook is a 3-arity function receiving (step, workflow, input_fact) and must return the (possibly modified) workflow.

Example

workflow
|> Workflow.attach_before_hook(:my_step, fn step, workflow, input_fact ->
  IO.inspect(input_fact, label: "Input fact")
  workflow
end)

build_getter_fn(meta_ref)

@spec build_getter_fn(map()) :: (t(), term() -> term())

Builds a getter function for a meta reference based on its kind.

The getter function has signature (workflow, target) -> value and is stored in the :meta_ref edge properties for use during the prepare phase.

Supported Kinds

  • :state_of - Returns the last known state of an Accumulator/StateMachine
  • :step_ran? - Returns boolean indicating if step has run
  • :fact_count - Returns count of facts produced by a component
  • :latest_value - Returns the most recent value produced
  • :latest_fact - Returns the most recent fact produced
  • :all_values - Returns all values produced as a list
  • :all_facts - Returns all facts produced as a list

build_log(wrk)

Returns a list of %ComponentAdded{} events for serialization and recovery.

Use with from_log/1 to persist and rebuild workflows.

Example

require Runic
alias Runic.Workflow

step = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new() |> Workflow.add(step)

# Get the build log for serialization
log = Workflow.build_log(workflow)
serialized = :erlang.term_to_binary(log)

# Later, rebuild from log
restored_log = :erlang.binary_to_term(serialized)
restored = Workflow.from_log(restored_log)

Returns

A list of %ComponentAdded{} events in order of addition, each containing the closure needed to rebuild the component.

causal_depth(workflow, fact)

Returns the causal depth of a fact by walking its ancestry chain.

Alias for ancestry_depth/2. For facts without ancestry (root inputs), returns 0.

Examples

iex> causal_depth(workflow, root_fact)
0

iex> causal_depth(workflow, produced_fact)
3  # produced after 3 causal steps

components(workflow)

Returns a map of all registered components in the workflow by the registered component name.

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [
...>   Runic.step(fn x -> x + 1 end, name: :add),
...>   Runic.step(fn x -> x * 2 end, name: :mult)
...> ])
iex> components = Workflow.components(workflow)
iex> is_map(components)
true
iex> Map.keys(components) |> Enum.sort()
[:add, :mult]

conditions(workflow)

Lists all %Condition{} structs in the workflow.

Conditions are the "left-hand side" predicates of rules.

Example

require Runic
alias Runic.Workflow

workflow = Runic.workflow(rules: [
  Runic.rule(fn x when x > 0 -> :positive end)
])

[condition] = Workflow.conditions(workflow)

connectable?(wrk, component)

connectable?(wrk, component, list)

Checks whether a component can be connected at a given point in the workflow.

When called with to: component_name, validates that the target component exists, arities match, and the components are connectable per the Component protocol. Returns :ok on success or {:error, reason} on failure.

The 2-arity version (no :to option) always returns :ok, since any component can be added to the workflow root.

connectables(wrk, name)

Returns a list of components in the workflow graph that are compatible for connection with the given component.

Compatibility is determined by matching arity and requiring the vertex to implement the Component protocol. Accepts a component name or struct.

connected_components(workflow)

Returns a graph containing only the registered components as vertices and :connects_to edges showing how components are connected to each other.

This provides a high-level projected view of the workflow suitable for visualization in no-code builders or canvas UIs, without exposing the internal invokable nodes (Steps, Conditions, Joins, etc.).

Example

require Runic
alias Runic.Workflow

step1 = Runic.step(fn x -> x + 1 end, name: :add)
step2 = Runic.step(fn x -> x * 2 end, name: :double)

workflow = Workflow.new()
  |> Workflow.add(step1)
  |> Workflow.add(step2, to: :add)

component_graph = Workflow.connected_components(workflow)
# => Multigraph with :add and :double vertices, edge :add -> :double

disable_event_emission(wf)

@spec disable_event_emission(t()) :: t()

Disables event emission on the workflow.

When emit_events is false (the default), apply_runnable/2 skips the uncommitted_events buffer entirely, avoiding allocation overhead for in-memory scripting use cases.

draw_meta_ref_edge(workflow, from, to, meta_ref)

@spec draw_meta_ref_edge(t(), term(), term(), map()) :: t()

Creates a :meta_ref edge from a node to its meta expression target.

This is called during Component.connect/3 when a node has meta references. The edge stores the getter function and context key for use during prepare.

Example

workflow = draw_meta_ref_edge(
  workflow,
  condition.hash,
  accumulator.hash,
  %{kind: :state_of, field_path: [:total], context_key: :cart_total}
)

enable_event_emission(wf)

@spec enable_event_emission(t()) :: t()

Enables event emission on the workflow.

When emit_events is true, apply_runnable/2 buffers events into uncommitted_events for consumption by durable execution stores.

event_log(wrk)

@spec event_log(t()) :: list()

Returns a complete event snapshot for the workflow.

The returned list contains %ComponentAdded{} events followed by %ReactionOccurred{} events, providing a full serializable snapshot of the workflow structure and execution state. Use with from_events/2 to persist and restore both the workflow definition and its runtime state.

Example

require Runic
alias Runic.Workflow

workflow = Runic.workflow(steps: [Runic.step(fn x -> x + 1 end, name: :add)])
ran = Workflow.react_until_satisfied(workflow, 5)

events = Workflow.event_log(ran)
restored = Workflow.from_events(events)

events_produced_since(wrk, fact)

Returns all %ReactionOccurred{} events caused since the given fact.

Uses ancestry-based causal ordering. Returns events with depth greater than the reference fact's depth, scoped to the same causal root.

Examples

require Runic
alias Runic.Workflow
alias Runic.Workflow.Fact

step = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new() |> Workflow.add(step)
fact = Fact.new(value: 5)
workflow = Workflow.react(workflow, fact)
events = Workflow.events_produced_since(workflow, fact)
length(events) > 0
# => true

execute_runnable(runnable)

Executes the Invokable protocol for runnable.

Examples

require Runic
alias Runic.Workflow
alias Runic.Workflow.Invokable

step = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new() |> Workflow.add(step)
workflow = Workflow.plan_eagerly(workflow, 5)

[runnable | _] = Workflow.prepared_runnables(workflow)
executed = Workflow.execute_runnable(runnable)
executed.status
# => :completed

execute_with_policies(runnables, policies)

@spec execute_with_policies([Runic.Workflow.Runnable.t()], list()) :: [
  Runic.Workflow.Runnable.t()
]

Executes a list of runnables with the given scheduler policies.

Resolves each runnable's policy and executes through the PolicyDriver. For use by external schedulers calling prepare_for_dispatch/1 directly.

facts(workflow)

@spec facts(t()) :: [Runic.Workflow.Fact.t()]

Returns all facts in the workflow, including inputs and productions.

Unlike productions/1, this includes input facts which have ancestry: nil. Useful for tracing the full causal chain of workflow execution.

Example

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> facts = workflow |> Workflow.react(5) |> Workflow.facts()
iex> length(facts)
2
iex> Enum.map(facts, & &1.value) |> Enum.sort()
[5, 10]

Ancestry

  • Input facts have ancestry: nil
  • Produced facts have ancestry: {producer_hash, parent_fact_hash}

fetch_component(wrk, name)

Retrieves a component from the workflow by name, returning an ok/error tuple.

Returns {:ok, component} if found, or {:error, :no_component_by_name} if no component is registered with the given name.

Example

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Workflow.new() |> Workflow.add(Runic.step(fn x -> x end, name: :identity))
iex> {:ok, step} = Workflow.fetch_component(workflow, :identity)
iex> step.name
:identity
iex> Workflow.fetch_component(workflow, :nonexistent)
{:error, :no_component_by_name}

from_events(events, base_workflow \\ nil)

@spec from_events(Enumerable.t(), t() | nil) :: t()

Rebuilds a workflow from a mixed stream of build and runtime events.

Separates %ComponentAdded{} events (workflow structure) from runtime events (e.g. %FactProduced{}, %ActivationConsumed{}), rebuilds the workflow structure via from_log/1, then replays runtime events via apply_event/2.

When a base_workflow is provided, skips structure reconstruction and replays only runtime events on top of the base.

This is the primary recovery path for event-sourced stores using append/3 and stream/2.

Examples

# Full rebuild from event stream
{:ok, event_stream} = store.stream(workflow_id, store_state)
workflow = Workflow.from_events(Enum.to_list(event_stream))

# Replay on an existing workflow structure
workflow = Workflow.from_events(runtime_events, base_workflow)

from_events(events, base_workflow, opts)

@spec from_events(Enumerable.t(), t() | nil, keyword()) :: t()

Like from_events/2 but accepts options for replay control.

Options

  • :fact_mode:full (default) creates Fact vertices from FactProduced events. :ref creates lightweight FactRef vertices instead, enabling lean replay that avoids loading cold fact values into memory.

Example

# Lean replay: creates FactRef vertices, resolve hot ones later
workflow = Workflow.from_events(events, nil, fact_mode: :ref)

from_log(events)

Rebuilds a workflow from a list of %ComponentAdded{} and/or %ReactionOccurred{} events.

Examples

require Runic
alias Runic.Workflow

step = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new() |> Workflow.add(step)
log = Workflow.build_log(workflow)

restored = Workflow.from_log(log)
restored |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions()
# => [10]

get_component(wrk, names)

Retrieves a component from the workflow by name.

Returns the component struct or nil if not found.

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end, name: :double)])
iex> step = Workflow.get_component(workflow, :double)
iex> step.name
:double

Subcomponent Access

For composite components like rules, access subcomponents with a tuple:

# Get the condition of a rule
Workflow.get_component(workflow, {:my_rule, :condition})

# Get the reaction of a rule
Workflow.get_component(workflow, {:my_rule, :reaction})

get_component!(wrk, name)

Retrieves a component from the workflow by name, raising if not found.

Same as get_component/2 but raises KeyError if no component matches.

Example

step = Workflow.get_component!(workflow, :my_step)

get_hooks(workflow, node_hash)

@spec get_hooks(t(), integer()) :: {list(), list()}

Gets the hooks for a given node hash.

Returns a tuple of {before_hooks, after_hooks} for use in CausalContext.

get_named_vertex(workflow, name)

get_run_context(workflow)

@spec get_run_context(t()) :: map()

Returns the full run context map.

Example

Workflow.get_run_context(workflow)
# => %{call_llm: %{api_key: "sk-..."}, _global: %{workspace_id: "ws1"}}

get_run_context(workflow, component_name)

@spec get_run_context(t(), atom() | String.t()) :: map()

Returns the resolved run context for a specific component.

Merges _global context (if any) with the component-specific context. Component-specific keys take precedence over global keys.

Example

Workflow.get_run_context(workflow, :call_llm)
# => %{workspace_id: "ws1", api_key: "sk-..."}

invoke(wrk, step, fact)

Executes the Invokable protocol for a runnable step and fact using the three-phase model.

This is a lower level API than as with the react or plan functions intended for process based scheduling and execution of workflows.

The three-phase execution model:

  1. Prepare - Extract minimal context from workflow, build a %Runnable{}
  2. Execute - Run the node's work function in isolation
  3. Apply - Reduce results back into the workflow

See invoke_with_events/2 for a version that returns events produced by the invokation that can be persisted incrementally as the workflow is executed for durable execution of long running workflows.

Examples

require Runic
alias Runic.Workflow
alias Runic.Workflow.Fact

step = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new() |> Workflow.add(step)
fact = Fact.new(value: 5)

workflow = Workflow.invoke(workflow, Workflow.root(), fact)
Workflow.is_runnable?(workflow)
# => true

invoke_with_events(wrk, step, fact)

Executes the Invokable protocol for a runnable step and fact and returns all newly caused events produced by the invokation.

This API is intended to enable durable execution of long running workflows by returning events that can be persisted elsewhere so the workflow state can be rebuilt with from_log/1.

Examples

require Runic
alias Runic.Workflow
alias Runic.Workflow.Fact

step = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new() |> Workflow.add(step)
fact = Fact.new(value: 5)

{workflow, events} = Workflow.invoke_with_events(workflow, Workflow.root(), fact)
is_list(events)
# => true

is_runnable?(workflow)

@spec is_runnable?(t()) :: boolean()

Returns true if the workflow has pending work (runnable or matchable nodes).

Use this in scheduler loops to determine when to stop processing.

Example

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> Workflow.is_runnable?(workflow)
false
iex> workflow = Workflow.plan_eagerly(workflow, 5)
iex> Workflow.is_runnable?(workflow)
true
iex> workflow = Workflow.react(workflow)
iex> Workflow.is_runnable?(workflow)
false

log(wrk)

This function is deprecated. Use build_log/1 + workflow.uncommitted_events with from_events/2 instead.

Returns the complete event log combining build_log/1 and reaction events.

Deprecated in favor of event_log/1 for full snapshots, or build_log/1 + workflow.uncommitted_events for incremental event-sourced persistence.

maybe_put_component(workflow, step)

merge(workflow, workflow2)

Merges the second workflow into the first, maintaining the name of the first.

All root-level components from workflow2 are connected to the root of workflow, making them siblings to the existing root components.

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> w1 = Runic.workflow(steps: [Runic.step(fn x -> x + 1 end, name: :add)])
iex> w2 = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end, name: :mult)])
iex> merged = Workflow.merge(w1, w2)
iex> merged |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions() |> Enum.sort()
[6, 10]

Merging Other Types

Any value implementing the Runic.Transmutable protocol can be merged:

workflow = Workflow.merge(workflow, rule)
workflow = Workflow.merge(workflow, step)

Use Cases

  • Combining modular workflow fragments at runtime
  • Building workflows dynamically from configuration
  • Composing reusable workflow templates

meta_dependencies(workflow, node)

@spec meta_dependencies(
  t(),
  struct()
) :: [struct()]

Returns the list of components that a node depends on via :meta_ref edges.

This is useful for understanding what state a rule or step will read during execution, and for validation/visualization.

Example

deps = meta_dependencies(workflow, my_rule_condition)
# => [%Accumulator{name: :cart_state, ...}]

meta_dependents(workflow, node)

@spec meta_dependents(
  t(),
  struct()
) :: [struct()]

Returns the list of nodes that depend on a component via :meta_ref edges.

This is the inverse of meta_dependencies/2 - it shows what nodes will read this component's state.

Example

dependents = meta_dependents(workflow, cart_accumulator)
# => [%Condition{...}, %Step{...}]

new()

Creates an empty workflow with no components.

Example

iex> alias Runic.Workflow
iex> workflow = Workflow.new()
iex> workflow.__struct__
Runic.Workflow

new(name)

Constructs a new Runic Workflow with the given name or parameters.

Examples

iex> alias Runic.Workflow
iex> workflow = Workflow.new(:my_workflow)
iex> workflow.name
:my_workflow

next_runnables(workflow)

Returns a list of {node, fact} pairs ready for activation in the next cycle.

All runnables returned are independent and can be executed in parallel. This is a low-level API for custom schedulers. For most use cases, prefer prepare_for_dispatch/1 which returns fully prepared %Runnable{} structs.

Example

runnables = Workflow.next_runnables(workflow)
# => [{%Step{name: :add}, %Fact{value: 5}}, ...]

next_runnables(workflow, fact_or_raw)

next_steps(g, parent_step)

Returns the child steps connected via dataflow edges from a parent step.

Useful for traversing the workflow graph structure.

Example

require Runic
alias Runic.Workflow

workflow = Runic.workflow(steps: [
  {Runic.step(fn x -> x + 1 end, name: :add),
   [Runic.step(fn x -> x * 2 end, name: :double)]}
])

add_step = Workflow.get_component(workflow, :add)
[double_step] = Workflow.next_steps(workflow, add_step)
double_step.name  # => :double

pending_runnables(workflow)

@spec pending_runnables(t()) :: [Runic.Workflow.RunnableDispatched.t()]

Identifies dispatched-but-not-completed runnables from the workflow's runnable events.

Returns a list of %RunnableDispatched{} events that have no corresponding %RunnableCompleted{} or %RunnableFailed{} event. Useful for crash recovery to find in-flight work that needs to be re-dispatched.

plan(wrk)

plan/1 will, for all next left hand side / match phase runnables activate and prepare next match runnables.

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> rule = Runic.rule(fn x when x > 0 -> :positive end, name: :pos)
iex> workflow = Runic.workflow(rules: [rule])
iex> workflow = Workflow.plan(workflow, 5)
iex> workflow = Workflow.plan(workflow)
iex> Workflow.is_runnable?(workflow)
true

plan(wrk, fact)

For a new set of inputs, plan/2 prepares the workflow agenda for the next set of reactions by matching through left-hand-side conditions in the workflow network.

For an inference engine's match -> select -> execute phase, this is the match phase.

Runic Workflow evaluation is forward chaining meaning from the root of the graph it starts by evaluating the direct children of the root node. If the workflow has any sort of conditions (from rules, etc) these conditions are prioritized in the agenda for the next cycle.

Plan will always match through a single level of nodes and identify the next runnable activations available.

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end, name: :double)])
iex> workflow = Workflow.plan(workflow, 5)
iex> Workflow.is_runnable?(workflow)
true
iex> workflow |> Workflow.react() |> Workflow.raw_productions()
[10]

plan_eagerly(workflow)

Eagerly plans through all produced facts in the workflow that haven't yet activated subsequent runnables.

This is useful for after a workflow has already been ran and satisfied without runnables and you want to continue preparing reactions in the workflow from output facts.

Finds facts via :produced edges that don't have pending :runnable or :matchable edges.

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [
...>   {Runic.step(fn x -> x + 1 end, name: :add),
...>    [Runic.step(fn x -> x * 2 end, name: :double)]}
...> ])
iex> workflow = Workflow.react_until_satisfied(workflow, 5)
iex> Workflow.is_runnable?(workflow)
false
iex> workflow = Workflow.plan_eagerly(workflow)
iex> Workflow.is_runnable?(workflow)
true

plan_eagerly(workflow, input_fact)

Invokes all left hand side / match-phase runnables in the workflow for a given input fact until all are satisfied.

Upon calling plan_eagerly/2, the workflow will only have right hand side runnables left to execute that react or react_until_satisfied can execute.

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> rule = Runic.rule(fn x when x > 0 -> :positive end, name: :pos)
iex> workflow = Runic.workflow(rules: [rule])
iex> workflow = Workflow.plan_eagerly(workflow, 5)
iex> Workflow.is_runnable?(workflow)
true
iex> workflow |> Workflow.react() |> Workflow.raw_productions()
[:positive]

prepare_for_dispatch(workflow)

@spec prepare_for_dispatch(t()) :: {t(), [Runic.Workflow.Runnable.t()]}

Prepares all available runnables for external dispatch.

Returns {workflow, [%Runnable{}]} where the workflow may have been updated by skip/defer reducers, and the runnables list contains nodes ready for execution.

This is designed for external schedulers that want to dispatch execution to worker pools or distributed systems.

Example

{workflow, runnables} = Workflow.prepare_for_dispatch(workflow)

# Dispatch to worker pool (can be parallel)
executed = Task.async_stream(runnables, fn r ->
  Invokable.execute(r.node, r)
end, timeout: :infinity)

# Apply results back
workflow = Enum.reduce(executed, workflow, fn {:ok, r}, w ->
  Workflow.apply_runnable(w, r)
end)

prepare_meta_context(workflow, node)

@spec prepare_meta_context(
  t(),
  struct()
) :: map()

Prepares meta context for a node by traversing its :meta_ref edges.

Each :meta_ref edge has a getter_fn in its properties that extracts the needed value from the workflow. This function executes all getter functions and builds a map of context_key => value pairs.

Example

# For a Condition with state_of(:cart_accumulator) in its where clause
meta_context = prepare_meta_context(workflow, condition)
# => %{cart_accumulator_state: %{total: 150, items: [...]}}

prepared_runnables(workflow)

@spec prepared_runnables(t()) :: [Runic.Workflow.Runnable.t()]

Returns a list of prepared %Runnable{} structs ready for execution.

This is the three-phase version of next_runnables/1. Each runnable contains everything needed to execute independently of the workflow.

Three-Phase Execution Model

  1. Prepare - This function calls Invokable.prepare/3 for each pending node
  2. Execute - Call Invokable.execute/2 on each runnable (can be parallelized)
  3. Apply - Call Workflow.apply_runnable(workflow, runnable) to fold events back

Returns

A list of %Runnable{} structs with status :pending, ready for execute/2. Nodes that return {:skip, _} or {:defer, _} from prepare are handled immediately and not included in the returned list.

Example

runnables = Workflow.prepared_runnables(workflow)
executed = Enum.map(runnables, &Invokable.execute(&1.node, &1))
workflow = Enum.reduce(executed, workflow, &Workflow.apply_runnable(&2, &1))

productions(workflow)

Returns all %Fact{} structs produced by the workflow.

Unlike raw_productions/1, this returns the full Fact structs including ancestry information for causal tracing. Does not include input facts.

Example

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> [fact] = workflow |> Workflow.react(5) |> Workflow.productions()
iex> fact.value
10

productions(wrk, component_name)

Returns all productions of a component or sub component by name.

Many components are made up of sub components so this may return multiple facts for each part.

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end, name: :double)])
iex> [fact] = workflow |> Workflow.react(5) |> Workflow.productions(:double)
iex> fact.value
10

productions_by_component(wrk)

Returns all facts produced in the workflow so far by component name and sub component.

Returns a map where each key is the name of the component and the value is a list of facts produced by that component.

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [
...>   Runic.step(fn x -> x + 1 end, name: :add),
...>   Runic.step(fn x -> x * 2 end, name: :mult)
...> ])
iex> pbc = workflow |> Workflow.react(5) |> Workflow.productions_by_component()
iex> is_map(pbc)
true
iex> Map.keys(pbc) |> Enum.sort()
[:add, :mult]

purge_memory(wrk)

Removes all %Fact{} vertices and generation integers from the workflow graph.

This clears the workflow's accumulated memory while preserving its structure (steps, rules, conditions, and flow edges). Useful for long-running workflows to free memory between processing batches.

Example

require Runic
alias Runic.Workflow

workflow = Runic.workflow(steps: [Runic.step(fn x -> x + 1 end)])
workflow = Workflow.react(workflow, 5)

# Facts exist after reaction
refute Enum.empty?(Workflow.facts(workflow))

# Purge clears them
workflow = Workflow.purge_memory(workflow)
assert Enum.empty?(Workflow.facts(workflow))

put_run_context(workflow, context)

@spec put_run_context(t(), map()) :: t()

Merges the given context map into the workflow's run context.

Run context provides external, runtime-scoped values (secrets, tenant IDs, database connections) to components during execution. Values are keyed by component name for scoped access, with an optional :_global key for values available to all components.

Run context is NOT part of the workflow's content hash, NOT serialized in the event log, and NOT visible in the fact graph.

Example

workflow = Workflow.put_run_context(workflow, %{
  call_llm: %{api_key: "sk-..."},
  _global: %{workspace_id: "ws1"}
})

raw_productions(workflow)

Returns the raw values from all produced facts.

This is the most common way to extract results from a workflow. Returns unwrapped values without the %Fact{} struct metadata.

Example

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> workflow |> Workflow.react(5) |> Workflow.raw_productions()
[10]

By Component Name

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(
...>   steps: [Runic.step(fn x -> x * 2 end, name: :double)]
...> )
iex> workflow |> Workflow.react(5) |> Workflow.raw_productions(:double)
[10]

raw_productions(wrk, component_name)

raw_productions_by_component(wrk)

Returns a map of component name to raw production values for all components.

Like raw_productions/1 but grouped by component name.

Example

require Runic
alias Runic.Workflow

workflow = Runic.workflow(
  steps: [
    {Runic.step(fn x -> x + 1 end, name: "step 1"),
     [Runic.step(fn x -> x + 2 end, name: "step 2")]}
  ]
)

%{"step 1" => [2], "step 2" => [4]} =
  workflow
  |> Workflow.react_until_satisfied(1)
  |> Workflow.raw_productions_by_component()

raw_reactions(wrk)

@spec raw_reactions(t()) :: [any()]

Returns raw (output value) side effects of the workflow - i.e. facts resulting from the execution of a Runic.Step

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> workflow |> Workflow.react(5) |> Workflow.raw_reactions() |> Enum.sort()
[5, 10]

react(workflow, opts \\ [])

@spec react(
  t(),
  keyword()
) :: t()

Executes a single reaction cycle using the three-phase model.

This function advances the workflow by one "generation" - executing all currently runnable steps/rules. Use react_until_satisfied/3 to run to completion.

Basic Usage

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> workflow = Workflow.react(workflow, 5)
iex> Workflow.raw_productions(workflow)
[10]

Options

  • :async - When true, executes runnables in parallel using Task.async_stream. Useful for I/O-bound workflows. Default: false (serial execution)
  • :max_concurrency - Maximum parallel tasks when async: true. Default: System.schedulers_online()
  • :timeout - Timeout for each task when async: true. Default: :infinity

Parallel Execution

workflow = Workflow.react(workflow, 5, async: true, max_concurrency: 4)

react(wrk, fact, opts)

@spec react(t(), Runic.Workflow.Fact.t() | term(), keyword()) :: t()

Executes a single reaction cycle with the given input value.

Plans through the match phase and executes one cycle of runnables. Commonly used with a raw value to start workflow processing.

Options

  • :async - When true, executes runnables in parallel. Default: false
  • :max_concurrency - Maximum parallel tasks when async: true
  • :timeout - Timeout for each task when async: true
  • :run_context - A map of external values for context/1 expressions. See put_run_context/2 and react_until_satisfied/3.

react_until_satisfied(workflow, fact_or_value \\ nil, opts \\ [])

@spec react_until_satisfied(t(), Runic.Workflow.Fact.t() | term(), keyword()) :: t()

Executes the workflow until no more runnables remain.

Iteratively calls react/2 until all reachable nodes have been executed. This is the recommended way to fully evaluate a workflow pipeline.

Basic Usage

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(
...>   steps: [
...>     {Runic.step(fn x -> x + 1 end, name: :add_one),
...>      [Runic.step(fn x -> x * 2 end, name: :double)]}
...>   ]
...> )
iex> results = workflow |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions()
iex> Enum.sort(results)
[6, 12]

Options

  • :async - When true, executes runnables in parallel. Default: false
  • :max_concurrency - Maximum parallel tasks when async: true
  • :timeout - Timeout for each task when async: true
  • :deadline_ms - Wall-clock deadline for the entire workflow execution in milliseconds. The policy driver will fail runnables with {:deadline_exceeded, remaining_ms} if the deadline is reached. Converted to an absolute deadline_at monotonic time internally.
  • :checkpoint - A 1-arity function called after each react cycle with the updated workflow. Useful for persisting intermediate state in long-running durable workflows.
  • :run_context - A map of external values keyed by component name, made available to components that use context/1 expressions. Supports a :_global key for values available to all components. See put_run_context/2.

Warning

Workflows that don't terminate (e.g., hooks that continuously add steps) will cause infinite loops. For non-terminating workflows, use react/2 in a controlled loop with exit conditions.

Best For

  • IEx/REPL experimentation
  • Scripts and notebooks
  • Testing workflows
  • Simple batch processing

For production use with complex scheduling needs, consider prepare_for_dispatch/1 with a custom scheduler process.

reactions(workflow)

@spec reactions(t()) :: [Runic.Workflow.Fact.t()]

Returns raw (output value) side effects of the workflow - i.e. facts resulting from the execution of a Runic.Step

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> facts = workflow |> Workflow.react(5) |> Workflow.reactions()
iex> Enum.map(facts, & &1.value) |> Enum.sort()
[5, 10]

remove_component(workflow, component_name)

Removes a component and its owned invokable nodes from the workflow.

Invokable nodes that are shared with other components (due to content-addressable hashing) are preserved. Downstream nodes of the removed component are rewired to the removed component's upstream parents so the rest of the workflow stays connected.

Also removes any :connects_to edges involving the component and appends a %ComponentRemoved{} event to the build log.

Example

require Runic
alias Runic.Workflow

step1 = Runic.step(fn x -> x + 1 end, name: :add)
step2 = Runic.step(fn x -> x * 2 end, name: :double)
step3 = Runic.step(fn x -> x - 1 end, name: :subtract)

workflow = Workflow.new()
  |> Workflow.add(step1)
  |> Workflow.add(step2, to: :add)
  |> Workflow.add(step3, to: :double)

workflow = Workflow.remove_component(workflow, :double)
# :add now flows directly to :subtract

required_context_keys(workflow)

@spec required_context_keys(t()) :: %{required(atom()) => keyword()}

Returns a map of component names to their context key requirements.

Only includes components that use context/1 or context/2 meta expressions. Components without context requirements are omitted.

Keys are annotated with whether they have defaults:

Example

Workflow.required_context_keys(workflow)
# => %{call_llm: [api_key: :required, model: {:optional, "gpt-4"}]}

results(workflow, component_names \\ nil, opts \\ [])

@spec results(t(), [atom() | binary()] | nil, keyword()) :: map()

Extracts structured results from a workflow.

When component_names is a list, extracts the last produced value for each named component, independent of output port declarations.

When component_names is nil (default) and the workflow declares output_ports, returns a map keyed by port name with values extracted according to each port's :from binding and :cardinality.

When component_names is nil and no output_ports are declared, falls back to raw_productions_by_component/1.

Options

  • :facts — when true, returns %Fact{} structs instead of raw values. Default false.
  • :all — when true, returns all produced values as a list instead of just the last one, regardless of port cardinality. Default false.

Examples

# With output ports
workflow = Runic.workflow(
  name: :pipeline,
  steps: [{Runic.step(&parse/1, name: :parse),
           [Runic.step(&price/1, name: :price)]}],
  output_ports: [total: [type: :float, from: :price]]
)

%{total: 42.50} =
  workflow
  |> Workflow.react_until_satisfied(order)
  |> Workflow.results()

# Explicit component selection
%{add: 6, mult: 10} =
  workflow
  |> Workflow.react_until_satisfied(5)
  |> Workflow.results([:add, :mult])

# With options
%{price: [%Fact{}, %Fact{}]} =
  Workflow.results(workflow, [:price], facts: true, all: true)

# Use output ports with options
%{total: [42.50, 43.00]} =
  Workflow.results(workflow, nil, all: true)

root_ancestor_hash(workflow, fact)

@spec root_ancestor_hash(t(), Runic.Workflow.Fact.t()) :: integer() | nil

Finds the root ancestor fact hash for a given fact.

Walks the ancestry chain until it finds a fact with ancestry: nil (root input). Returns the hash of that root fact, or the fact's own hash if it is a root.

Examples

iex> root_ancestor_hash(workflow, root_fact)
123456  # root_fact.hash

iex> root_ancestor_hash(workflow, deeply_nested_fact)
123456  # hash of the original root input

set_scheduler_policies(workflow, policies)

@spec set_scheduler_policies(t(), list()) :: t()

Replaces the workflow's scheduler policies list entirely.

skip_downstream_subgraph(workflow, failed_node)

@spec skip_downstream_subgraph(
  t(),
  struct()
) :: t()

Marks all nodes transitively downstream of failed_node as unreachable.

Walks the structural :flow edges from failed_node to find all transitive dependents, then relabels any pending :runnable or :joined edges pointing to those nodes as :upstream_failed. This prevents the workflow from getting stuck waiting for work that can never complete due to a missing upstream fact.

steps(workflow)

Lists all %Step{} structs in the workflow.

Useful for introspecting workflow structure.

Example

iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [
...>   Runic.step(fn x -> x + 1 end, name: :add),
...>   Runic.step(fn x -> x * 2 end, name: :mult)
...> ])
iex> steps = Workflow.steps(workflow)
iex> length(steps)
2
iex> Enum.map(steps, & &1.name) |> Enum.sort()
[:add, :mult]

sub_components(workflow, component_name)

Returns a keyword list of sub-components of the given component by kind.

Examples

iex> require Runic
iex> alias Runic.Workflow
iex> rule = Runic.rule(fn x when x > 0 -> :positive end, name: :pos_check)
iex> workflow = Workflow.new() |> Workflow.add(rule)
iex> subs = Workflow.sub_components(workflow, :pos_check)
iex> Keyword.keys(subs) |> Enum.sort()
[:condition, :reaction]

to_cytoscape(workflow, opts \\ [])

@spec to_cytoscape(t(), Keyword.t()) :: [map()]

Serializes the workflow to Cytoscape.js element JSON format.

Returns a list of node and edge elements compatible with Cytoscape.js and Kino.Cytoscape in Livebook.

Example

iex> elements = Workflow.to_cytoscape(workflow)
iex> Kino.Cytoscape.new(elements)

to_dot(workflow, opts \\ [])

@spec to_dot(t(), Keyword.t()) :: String.t()

Serializes the workflow to DOT (Graphviz) format.

Returns a string that can be rendered with Graphviz tools.

Example

iex> dot = Workflow.to_dot(workflow)
iex> File.write!("workflow.dot", dot)

to_edgelist(workflow, opts \\ [])

@spec to_edgelist(t(), Keyword.t()) :: [tuple()] | String.t()

Serializes the workflow to an edgelist format.

Returns a list of {from, to, label} tuples by default.

Options

  • :format - :tuples (default) or :string
  • :include_memory - Include causal edges (default: false)

Examples

iex> Workflow.to_edgelist(workflow)
[{:root, :step1, :flow}, {:step1, :step2, :flow}]

iex> Workflow.to_edgelist(workflow, format: :string)
"root -> step1 [flow]\nstep1 -> step2 [flow]"

to_mermaid(workflow, opts \\ [])

@spec to_mermaid(t(), Keyword.t()) :: String.t()

Serializes the workflow to Mermaid flowchart format.

Returns a string that can be rendered by Mermaid.js.

Options

  • :direction - Flow direction: :TB (default), :LR, :BT, :RL
  • :include_memory - Include causal reaction edges (default: false)
  • :title - Optional title comment

Examples

iex> workflow |> Workflow.to_mermaid()
"flowchart TB\n    ..."

iex> workflow |> Workflow.to_mermaid(direction: :LR, include_memory: true)
"flowchart LR\n    ..."

to_mermaid_sequence(workflow, opts \\ [])

@spec to_mermaid_sequence(t(), Keyword.t()) :: String.t()

Serializes causal reactions as a Mermaid sequence diagram.

Shows how facts flow through steps and produce new facts over time. Best used after workflow execution to visualize the causal chain.

Example

iex> workflow |> Workflow.plan_eagerly(input) |> Workflow.react() |> Workflow.to_mermaid_sequence()
"sequenceDiagram\n    ..."

validate_run_context(workflow, context)

@spec validate_run_context(t(), map()) ::
  :ok | {:error, %{required(atom()) => [atom()]}}

Validates that the given run_context satisfies all context/1 references in the workflow.

Returns :ok if all required keys are present, or {:error, missing} with a map of component names to their missing context keys. Keys with defaults (from context/2) are not reported as missing.

Example

Workflow.validate_run_context(workflow, %{call_llm: %{api_key: "sk-..."}})
# => :ok

Workflow.validate_run_context(workflow, %{})
# => {:error, %{call_llm: [:api_key], db_query: [:repo, :tenant_id]}}