Runtime evaluation engine for Runic workflows.
Runic Workflows are used to compose many branching steps, rules and accumulations/reductions at runtime for lazy or eager evaluation. You can think of Runic Workflows as a recipe of rules that when fed a stream of facts may react.
Quick Start
require Runic
alias Runic.Workflow
workflow = Runic.workflow(
steps: [
{Runic.step(fn x -> x + 1 end, name: :add),
[Runic.step(fn x -> x * 2 end, name: :double)]}
]
)
workflow
|> Workflow.react_until_satisfied(5)
|> Workflow.raw_productions()
# => [12]Three-Phase Execution Model
All workflow evaluation uses a three-phase execution model that enables parallel execution and external scheduler integration:
- Prepare - Extract minimal context from the workflow into
%Runnable{}structs - Execute - Run node work functions in isolation (can be parallelized)
- Apply - Reduce results back into the workflow
Basic Execution
For simple use cases, use react/2 for a single cycle or react_until_satisfied/3
to run to completion:
# Single cycle
workflow = Workflow.react(workflow, input)
# Run to completion (recommended for simple use)
workflow = Workflow.react_until_satisfied(workflow, input)Parallel Execution
Enable parallel execution for I/O-bound or CPU-intensive workflows:
workflow = Workflow.react_until_satisfied(workflow, input,
async: true,
max_concurrency: 8,
timeout: :infinity
)External Scheduler Integration
For custom schedulers, worker pools, or distributed execution, use the low-level three-phase APIs directly:
# Phase 1: Prepare runnables for dispatch
workflow = Workflow.plan_eagerly(workflow, input)
{workflow, runnables} = Workflow.prepare_for_dispatch(workflow)
# Phase 2: Execute (dispatch to worker pool, external service, etc.)
executed = Task.async_stream(runnables, fn runnable ->
Runic.Workflow.Invokable.execute(runnable.node, runnable)
end, timeout: :infinity)
# Phase 3: Apply results back to workflow
workflow = Enum.reduce(executed, workflow, fn {:ok, runnable}, wrk ->
Workflow.apply_runnable(wrk, runnable)
end)
# Continue if more work is available
if Workflow.is_runnable?(workflow), do: # repeat...Key APIs for external scheduling:
prepare_for_dispatch/1- Returns{workflow, [%Runnable{}]}for dispatchapply_runnable/2- Applies a completed runnable back to the workflowInvokable.execute/2- Executes a runnable in isolation (no workflow access)
Runtime Context
Runtime context provides a way to inject external, runtime-scoped values (API keys, database connections, tenant IDs, feature flags) into workflow components without baking them into closures or the workflow graph.
Components declare their context dependencies using context/1 expressions in the
Runic DSL:
step = Runic.step(fn _x -> context(:api_key) end, name: :call_llm)
rule = Runic.rule name: :gated do
given(val: v)
where(v > context(:threshold))
then(fn %{val: v} -> {:ok, v} end)
endContext is provided at runtime via put_run_context/2 or the :run_context option:
# Set context directly
workflow = Workflow.put_run_context(workflow, %{
call_llm: %{api_key: "sk-..."},
_global: %{workspace_id: "ws1"}
})
# Or pass via options
Workflow.react_until_satisfied(workflow, input,
run_context: %{call_llm: %{api_key: "sk-..."}}
)Context values are:
- Scoped by component name with an optional
_globalkey for shared values - Not part of the workflow hash — two workflows with different contexts are structurally identical
- Not serialized in the event log or fact graph
- Resolved during the prepare phase of the three-phase execution model
Use required_context_keys/1 and validate_run_context/2 to introspect and validate
context requirements before execution.
Workflow Composition
Workflows can be composed together using merge/2 or by adding components with add/3:
# Merge two workflows
combined = Workflow.merge(workflow1, workflow2)
# Add components dynamically
workflow = Workflow.new()
|> Workflow.add(step1)
|> Workflow.add(step2, to: :step1)
|> Workflow.add(join_step, to: [:branch_a, :branch_b])Any component implementing the Runic.Transmutable protocol can be merged into a workflow.
Introspection APIs
Query workflow structure and state:
# List all components by type
Workflow.steps(workflow) # All Step structs
Workflow.conditions(workflow) # All Condition structs
# Get components by name
Workflow.get_component(workflow, :my_step)
# Query execution state
Workflow.is_runnable?(workflow) # Any work pending?
Workflow.next_runnables(workflow) # List of {node, fact} pairs
# Traverse workflow structure
Workflow.next_steps(workflow, parent_step) # Children of a stepResult Extraction
Extract results after workflow execution:
# Structured results using output port contract (recommended)
Workflow.results(workflow) # => %{total: 42.50}
# Explicit component selection
Workflow.results(workflow, [:add, :mult]) # => %{add: 6, mult: 10}
# With options
Workflow.results(workflow, [:price], facts: true) # => %{price: %Fact{}}
Workflow.results(workflow, nil, all: true) # => %{total: [v1, v2]}
# Raw values (low-level)
Workflow.raw_productions(workflow) # All leaf outputs
Workflow.raw_productions(workflow, :step_name) # From specific component
# Full Fact structs with ancestry
Workflow.productions(workflow)
# All facts including inputs
Workflow.facts(workflow)Serialization
Serialize workflows for persistence and visualization:
# Build log for persistence
log = Workflow.build_log(workflow)
serialized = :erlang.term_to_binary(log)
# Rebuild from log
workflow = Workflow.from_log(:erlang.binary_to_term(serialized))
# Visualization formats
Workflow.to_mermaid(workflow) # Mermaid flowchart
Workflow.to_dot(workflow) # Graphviz DOT format
Workflow.to_cytoscape(workflow) # Cytoscape.js JSON
Workflow.to_edgelist(workflow) # Edge list tuplesWhen to Use Runic Workflows
Runic Workflows are intended for use cases where your program is built or modified at runtime. They are useful for:
- Complex data-dependent pipelines
- Expert systems and rule engines
- User-defined logical systems (low-code tools, DSLs)
- Dynamic workflow composition at runtime
If your model can be expressed in advance with compiled code using the usual control flow and concurrency tools available in Elixir/Erlang, Runic Workflows may not be necessary. There are performance trade-offs of doing more compilation and evaluation at runtime.
See the Cheatsheet and Usage Rules guides for more.
Summary
Functions
Adds a component to the workflow, connecting it to the parent step or root if no parent is specified.
Adds a list of rules to the workflow root.
Prepends a {matcher, policy} rule to the workflow's scheduler policies (higher priority).
Adds a step to the root of the workflow that is always evaluated with a new fact.
Adds a dependent step to some other step in a workflow by name.
Adds a batch of steps to the workflow, supporting pipelines and joins.
Adds a component to the workflow and returns the updated workflow along with
the %ComponentAdded{} events produced.
Computes the causal depth of a fact by walking its ancestry chain.
Appends runnable lifecycle events to the workflow's event accumulator.
Appends a {matcher, policy} rule to the workflow's scheduler policies (lower priority).
Applies a single %ComponentAdded{} event to the workflow, adding the
component described by the event.
Applies a list of %ComponentAdded{} events to the workflow.
Applies a list of hook apply functions to the workflow.
Applies a completed runnable back to the workflow.
Attaches a hook function to be run after a given component step.
Attaches a hook function to be run before a given component step.
Builds a getter function for a meta reference based on its kind.
Returns a list of %ComponentAdded{} events for serialization and recovery.
Returns the causal depth of a fact by walking its ancestry chain.
Returns a map of all registered components in the workflow by the registered component name.
Lists all %Condition{} structs in the workflow.
Checks whether a component can be connected at a given point in the workflow.
Returns a list of components in the workflow graph that are compatible for connection with the given component.
Returns a graph containing only the registered components as vertices
and :connects_to edges showing how components are connected to each other.
Disables event emission on the workflow.
Creates a :meta_ref edge from a node to its meta expression target.
Enables event emission on the workflow.
Returns a complete event snapshot for the workflow.
Returns all %ReactionOccurred{} events caused since the given fact.
Executes the Invokable protocol for runnable.
Executes a list of runnables with the given scheduler policies.
Returns all facts in the workflow, including inputs and productions.
Retrieves a component from the workflow by name, returning an ok/error tuple.
Rebuilds a workflow from a mixed stream of build and runtime events.
Like from_events/2 but accepts options for replay control.
Rebuilds a workflow from a list of %ComponentAdded{} and/or %ReactionOccurred{} events.
Retrieves a component from the workflow by name.
Retrieves a component from the workflow by name, raising if not found.
Gets the hooks for a given node hash.
Returns the full run context map.
Returns the resolved run context for a specific component.
Executes the Invokable protocol for a runnable step and fact using the three-phase model.
Executes the Invokable protocol for a runnable step and fact and returns all newly caused events produced by the invokation.
Returns true if the workflow has pending work (runnable or matchable nodes).
Returns the complete event log combining build_log/1 and reaction events.
Merges the second workflow into the first, maintaining the name of the first.
Returns the list of components that a node depends on via :meta_ref edges.
Returns the list of nodes that depend on a component via :meta_ref edges.
Creates an empty workflow with no components.
Constructs a new Runic Workflow with the given name or parameters.
Returns a list of {node, fact} pairs ready for activation in the next cycle.
Returns the child steps connected via dataflow edges from a parent step.
Identifies dispatched-but-not-completed runnables from the workflow's runnable events.
plan/1 will, for all next left hand side / match phase runnables activate and prepare next match runnables.
For a new set of inputs, plan/2 prepares the workflow agenda for the next set of reactions by
matching through left-hand-side conditions in the workflow network.
Eagerly plans through all produced facts in the workflow that haven't yet activated subsequent runnables.
Invokes all left hand side / match-phase runnables in the workflow for a given input fact until all are satisfied.
Prepares all available runnables for external dispatch.
Prepares meta context for a node by traversing its :meta_ref edges.
Returns a list of prepared %Runnable{} structs ready for execution.
Returns all %Fact{} structs produced by the workflow.
Returns all productions of a component or sub component by name.
Returns all facts produced in the workflow so far by component name and sub component.
Removes all %Fact{} vertices and generation integers from the workflow graph.
Merges the given context map into the workflow's run context.
Returns the raw values from all produced facts.
Returns a map of component name to raw production values for all components.
Returns raw (output value) side effects of the workflow - i.e. facts resulting from the execution of a Runic.Step
Executes a single reaction cycle using the three-phase model.
Executes a single reaction cycle with the given input value.
Executes the workflow until no more runnables remain.
Returns raw (output value) side effects of the workflow - i.e. facts resulting from the execution of a Runic.Step
Removes a component and its owned invokable nodes from the workflow.
Returns a map of component names to their context key requirements.
Extracts structured results from a workflow.
Finds the root ancestor fact hash for a given fact.
Replaces the workflow's scheduler policies list entirely.
Marks all nodes transitively downstream of failed_node as unreachable.
Lists all %Step{} structs in the workflow.
Returns a keyword list of sub-components of the given component by kind.
Serializes the workflow to Cytoscape.js element JSON format.
Serializes the workflow to DOT (Graphviz) format.
Serializes the workflow to an edgelist format.
Serializes the workflow to Mermaid flowchart format.
Serializes causal reactions as a Mermaid sequence diagram.
Validates that the given run_context satisfies all context/1 references
in the workflow.
Types
@type t() :: %Runic.Workflow{ after_hooks: map(), before_hooks: map(), build_log: term(), components: map(), emit_events: boolean(), graph: Multigraph.t(), hash: binary(), input_ports: keyword() | nil, inputs: map(), mapped: map(), name: String.t(), output_ports: keyword() | nil, run_context: map(), runnable_events: list(), scheduler_policies: list(), uncommitted_events: term() }
Functions
Adds a component to the workflow, connecting it to the parent step or root if no parent is specified.
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> step = Runic.step(fn x -> x + 1 end, name: :add_one)
iex> workflow = Workflow.new() |> Workflow.add(step)
iex> Workflow.get_component(workflow, :add_one) |> Map.get(:name)
:add_one
iex> require Runic
iex> alias Runic.Workflow
iex> s1 = Runic.step(fn x -> x + 1 end, name: :first)
iex> s2 = Runic.step(fn x -> x * 2 end, name: :second)
iex> workflow = Workflow.new() |> Workflow.add(s1) |> Workflow.add(s2, to: :first)
iex> workflow |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions() |> Enum.sort()
[6, 12]When :to is a list of parent names, a Join is created so the dependent step
waits for all parents to produce facts before running:
require Runic
alias Runic.Workflow
a_step = Runic.step(fn x -> x + 1 end, name: :a)
b_step = Runic.step(fn x -> x * 2 end, name: :b)
sum_step = Runic.step(fn a, b -> a + b end, name: :sum)
workflow =
Workflow.new()
|> Workflow.add(a_step)
|> Workflow.add(b_step)
|> Workflow.add(sum_step, to: [:a, :b])
result =
workflow
|> Workflow.react_until_satisfied(5)
|> Workflow.raw_reactions()
6 in result # :a produced 5 + 1
10 in result # :b produced 5 * 2
16 in result # :sum produced 6 + 10Port Validation
By default, add/3 validates that the producer's output ports are type-compatible
with the consumer's input ports. Control this with the :validate option:
:error(default) — raisesRunic.IncompatiblePortErroron type mismatch:warn— logs a warning but allows the connection:off— skips validation entirely (useful for prototyping)# Bypass validation during prototyping Workflow.add(workflow, step, to: :parent, validate: :off)
Untyped components (all ports default to type: :any) always pass validation,
preserving Runic's gradual typing philosophy.
Adds a list of rules to the workflow root.
Each rule is added via add/2. Passing nil is a no-op.
Example
require Runic
alias Runic.Workflow
rules = [
Runic.rule(fn x when x > 0 -> :positive end, name: :pos),
Runic.rule(fn x when x < 0 -> :negative end, name: :neg)
]
workflow = Workflow.new() |> Workflow.add_rules(rules)
Prepends a {matcher, policy} rule to the workflow's scheduler policies (higher priority).
Adds a step to the root of the workflow that is always evaluated with a new fact.
Adds a dependent step to some other step in a workflow by name.
The dependent step is fed signed facts produced by the parent step during a reaction.
Adding dependent steps is the most low-level way of building a dataflow execution graph as it assumes no conditional, branching logic.
If you're just building a pipeline, dependent steps can be sufficient, however you might want Rules for conditional branching logic.
Adds a batch of steps to the workflow, supporting pipelines and joins.
Accepts a list where each element is one of:
%Step{}— added directly to the workflow root{%Step{}, dependent_steps}— a pipeline: the parent step is added to root, then dependent steps are connected downstream{[%Step{}, ...], dependent_steps}— multiple parent steps joined: all parents are added to root, aJoinnode is created, and dependents follow the join
Passing nil is a no-op.
Example
require Runic
alias Runic.Workflow
workflow = Workflow.new() |> Workflow.add_steps([
{Runic.step(fn x -> x + 1 end, name: :add),
[Runic.step(fn x -> x * 2 end, name: :double)]}
])
Adds a component to the workflow and returns the updated workflow along with
the %ComponentAdded{} events produced.
This is useful for event-sourced workflow construction where you need to
capture the events for later replay via apply_events/2.
Example
require Runic
alias Runic.Workflow
step = Runic.step(fn x -> x + 1 end, name: :add_one)
{workflow, events} = Workflow.add_with_events(Workflow.new(), step)
# Events can rebuild the same workflow
rebuilt = Workflow.apply_events(Workflow.new(), events)
@spec ancestry_depth(t(), Runic.Workflow.Fact.t() | Runic.Workflow.FactRef.t()) :: non_neg_integer()
Computes the causal depth of a fact by walking its ancestry chain.
Replaces generation counter for causal ordering. A fact with no ancestry (root input) has depth 0. Each causal step adds 1 to the depth.
Examples
iex> ancestry_depth(workflow, root_fact)
0
iex> ancestry_depth(workflow, fact_after_two_steps)
2
Appends runnable lifecycle events to the workflow's event accumulator.
Used by schedulers to record %RunnableDispatched{}, %RunnableCompleted{},
and %RunnableFailed{} events produced by PolicyDriver.execute/3 with
emit_events: true.
Appends a {matcher, policy} rule to the workflow's scheduler policies (lower priority).
Applies a single %ComponentAdded{} event to the workflow, adding the
component described by the event.
Part of the event sourcing system — use with events captured from
add_with_events/2 or build_log/1 to reconstruct a workflow incrementally.
Applies a list of %ComponentAdded{} events to the workflow.
Batch version of apply_event/2. Events are applied in order via Enum.reduce/3.
Example
require Runic
alias Runic.Workflow
step1 = Runic.step(fn x -> x + 1 end, name: :add_one)
step2 = Runic.step(fn x -> x * 2 end, name: :double)
{workflow, events1} = Workflow.add_with_events(Workflow.new(), step1)
{_workflow, events2} = Workflow.add_with_events(workflow, step2, to: :add_one)
rebuilt = Workflow.apply_events(Workflow.new(), events1 ++ events2)
Applies a list of hook apply functions to the workflow.
This is used during the apply phase to execute deferred workflow modifications returned by hooks during the execute phase.
Example
workflow
|> Workflow.apply_hook_fns(before_apply_fns)
|> do_main_apply_logic()
|> Workflow.apply_hook_fns(after_apply_fns)
@spec apply_runnable(t(), Runic.Workflow.Runnable.t()) :: t()
Applies a completed runnable back to the workflow.
Called by schedulers after receiving execution results.
Events from the runnable are folded via apply_event/2, hook apply_fns
are run, and downstream nodes are activated.
Parameters
workflow- The current workflow staterunnable- A runnable with status:completed,:failed,:skipped, or:pending
Returns
Updated workflow with the runnable's effects applied.
Example
executed = Invokable.execute(runnable.node, runnable)
workflow = Workflow.apply_runnable(workflow, executed)
Attaches a hook function to be run after a given component step.
Examples
workflow
|> Workflow.attach_after_hook("my_component", fn step, workflow, output_fact ->
IO.inspect(output_fact, label: "Output fact")
IO.inspect(step, label: "Step")
workflow
end)
Attaches a hook function to be run before a given component step.
The hook is a 3-arity function receiving (step, workflow, input_fact) and
must return the (possibly modified) workflow.
Example
workflow
|> Workflow.attach_before_hook(:my_step, fn step, workflow, input_fact ->
IO.inspect(input_fact, label: "Input fact")
workflow
end)
Builds a getter function for a meta reference based on its kind.
The getter function has signature (workflow, target) -> value and is
stored in the :meta_ref edge properties for use during the prepare phase.
Supported Kinds
:state_of- Returns the last known state of an Accumulator/StateMachine:step_ran?- Returns boolean indicating if step has run:fact_count- Returns count of facts produced by a component:latest_value- Returns the most recent value produced:latest_fact- Returns the most recent fact produced:all_values- Returns all values produced as a list:all_facts- Returns all facts produced as a list
Returns a list of %ComponentAdded{} events for serialization and recovery.
Use with from_log/1 to persist and rebuild workflows.
Example
require Runic
alias Runic.Workflow
step = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new() |> Workflow.add(step)
# Get the build log for serialization
log = Workflow.build_log(workflow)
serialized = :erlang.term_to_binary(log)
# Later, rebuild from log
restored_log = :erlang.binary_to_term(serialized)
restored = Workflow.from_log(restored_log)Returns
A list of %ComponentAdded{} events in order of addition, each containing
the closure needed to rebuild the component.
@spec causal_depth(t(), Runic.Workflow.Fact.t() | Runic.Workflow.FactRef.t()) :: non_neg_integer()
Returns the causal depth of a fact by walking its ancestry chain.
Alias for ancestry_depth/2. For facts without ancestry (root inputs), returns 0.
Examples
iex> causal_depth(workflow, root_fact)
0
iex> causal_depth(workflow, produced_fact)
3 # produced after 3 causal steps
Returns a map of all registered components in the workflow by the registered component name.
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [
...> Runic.step(fn x -> x + 1 end, name: :add),
...> Runic.step(fn x -> x * 2 end, name: :mult)
...> ])
iex> components = Workflow.components(workflow)
iex> is_map(components)
true
iex> Map.keys(components) |> Enum.sort()
[:add, :mult]
Lists all %Condition{} structs in the workflow.
Conditions are the "left-hand side" predicates of rules.
Example
require Runic
alias Runic.Workflow
workflow = Runic.workflow(rules: [
Runic.rule(fn x when x > 0 -> :positive end)
])
[condition] = Workflow.conditions(workflow)
Checks whether a component can be connected at a given point in the workflow.
When called with to: component_name, validates that the target component
exists, arities match, and the components are connectable per the Component
protocol. Returns :ok on success or {:error, reason} on failure.
The 2-arity version (no :to option) always returns :ok, since any
component can be added to the workflow root.
Returns a list of components in the workflow graph that are compatible for connection with the given component.
Compatibility is determined by matching arity and requiring the vertex to
implement the Component protocol. Accepts a component name or struct.
Returns a graph containing only the registered components as vertices
and :connects_to edges showing how components are connected to each other.
This provides a high-level projected view of the workflow suitable for visualization in no-code builders or canvas UIs, without exposing the internal invokable nodes (Steps, Conditions, Joins, etc.).
Example
require Runic
alias Runic.Workflow
step1 = Runic.step(fn x -> x + 1 end, name: :add)
step2 = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new()
|> Workflow.add(step1)
|> Workflow.add(step2, to: :add)
component_graph = Workflow.connected_components(workflow)
# => Multigraph with :add and :double vertices, edge :add -> :double
Disables event emission on the workflow.
When emit_events is false (the default), apply_runnable/2 skips
the uncommitted_events buffer entirely, avoiding allocation overhead
for in-memory scripting use cases.
Creates a :meta_ref edge from a node to its meta expression target.
This is called during Component.connect/3 when a node has meta references.
The edge stores the getter function and context key for use during prepare.
Example
workflow = draw_meta_ref_edge(
workflow,
condition.hash,
accumulator.hash,
%{kind: :state_of, field_path: [:total], context_key: :cart_total}
)
Enables event emission on the workflow.
When emit_events is true, apply_runnable/2 buffers events into
uncommitted_events for consumption by durable execution stores.
Returns a complete event snapshot for the workflow.
The returned list contains %ComponentAdded{} events followed by
%ReactionOccurred{} events, providing a full serializable snapshot of
the workflow structure and execution state. Use with from_events/2 to
persist and restore both the workflow definition and its runtime state.
Example
require Runic
alias Runic.Workflow
workflow = Runic.workflow(steps: [Runic.step(fn x -> x + 1 end, name: :add)])
ran = Workflow.react_until_satisfied(workflow, 5)
events = Workflow.event_log(ran)
restored = Workflow.from_events(events)
Returns all %ReactionOccurred{} events caused since the given fact.
Uses ancestry-based causal ordering. Returns events with depth greater than the reference fact's depth, scoped to the same causal root.
Examples
require Runic
alias Runic.Workflow
alias Runic.Workflow.Fact
step = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new() |> Workflow.add(step)
fact = Fact.new(value: 5)
workflow = Workflow.react(workflow, fact)
events = Workflow.events_produced_since(workflow, fact)
length(events) > 0
# => true
Executes the Invokable protocol for runnable.
Examples
require Runic
alias Runic.Workflow
alias Runic.Workflow.Invokable
step = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new() |> Workflow.add(step)
workflow = Workflow.plan_eagerly(workflow, 5)
[runnable | _] = Workflow.prepared_runnables(workflow)
executed = Workflow.execute_runnable(runnable)
executed.status
# => :completed
@spec execute_with_policies([Runic.Workflow.Runnable.t()], list()) :: [ Runic.Workflow.Runnable.t() ]
Executes a list of runnables with the given scheduler policies.
Resolves each runnable's policy and executes through the PolicyDriver.
For use by external schedulers calling prepare_for_dispatch/1 directly.
@spec facts(t()) :: [Runic.Workflow.Fact.t()]
Returns all facts in the workflow, including inputs and productions.
Unlike productions/1, this includes input facts which have ancestry: nil.
Useful for tracing the full causal chain of workflow execution.
Example
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> facts = workflow |> Workflow.react(5) |> Workflow.facts()
iex> length(facts)
2
iex> Enum.map(facts, & &1.value) |> Enum.sort()
[5, 10]Ancestry
- Input facts have
ancestry: nil - Produced facts have
ancestry: {producer_hash, parent_fact_hash}
Retrieves a component from the workflow by name, returning an ok/error tuple.
Returns {:ok, component} if found, or {:error, :no_component_by_name} if
no component is registered with the given name.
Example
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Workflow.new() |> Workflow.add(Runic.step(fn x -> x end, name: :identity))
iex> {:ok, step} = Workflow.fetch_component(workflow, :identity)
iex> step.name
:identity
iex> Workflow.fetch_component(workflow, :nonexistent)
{:error, :no_component_by_name}
@spec from_events(Enumerable.t(), t() | nil) :: t()
Rebuilds a workflow from a mixed stream of build and runtime events.
Separates %ComponentAdded{} events (workflow structure) from runtime events
(e.g. %FactProduced{}, %ActivationConsumed{}), rebuilds the workflow
structure via from_log/1, then replays runtime events via apply_event/2.
When a base_workflow is provided, skips structure reconstruction and
replays only runtime events on top of the base.
This is the primary recovery path for event-sourced stores using
append/3 and stream/2.
Examples
# Full rebuild from event stream
{:ok, event_stream} = store.stream(workflow_id, store_state)
workflow = Workflow.from_events(Enum.to_list(event_stream))
# Replay on an existing workflow structure
workflow = Workflow.from_events(runtime_events, base_workflow)
@spec from_events(Enumerable.t(), t() | nil, keyword()) :: t()
Like from_events/2 but accepts options for replay control.
Options
:fact_mode—:full(default) createsFactvertices fromFactProducedevents.:refcreates lightweightFactRefvertices instead, enabling lean replay that avoids loading cold fact values into memory.
Example
# Lean replay: creates FactRef vertices, resolve hot ones later
workflow = Workflow.from_events(events, nil, fact_mode: :ref)
Rebuilds a workflow from a list of %ComponentAdded{} and/or %ReactionOccurred{} events.
Examples
require Runic
alias Runic.Workflow
step = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new() |> Workflow.add(step)
log = Workflow.build_log(workflow)
restored = Workflow.from_log(log)
restored |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions()
# => [10]
Retrieves a component from the workflow by name.
Returns the component struct or nil if not found.
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end, name: :double)])
iex> step = Workflow.get_component(workflow, :double)
iex> step.name
:doubleSubcomponent Access
For composite components like rules, access subcomponents with a tuple:
# Get the condition of a rule
Workflow.get_component(workflow, {:my_rule, :condition})
# Get the reaction of a rule
Workflow.get_component(workflow, {:my_rule, :reaction})
Retrieves a component from the workflow by name, raising if not found.
Same as get_component/2 but raises KeyError if no component matches.
Example
step = Workflow.get_component!(workflow, :my_step)
Gets the hooks for a given node hash.
Returns a tuple of {before_hooks, after_hooks} for use in CausalContext.
Returns the full run context map.
Example
Workflow.get_run_context(workflow)
# => %{call_llm: %{api_key: "sk-..."}, _global: %{workspace_id: "ws1"}}
Returns the resolved run context for a specific component.
Merges _global context (if any) with the component-specific context.
Component-specific keys take precedence over global keys.
Example
Workflow.get_run_context(workflow, :call_llm)
# => %{workspace_id: "ws1", api_key: "sk-..."}
Executes the Invokable protocol for a runnable step and fact using the three-phase model.
This is a lower level API than as with the react or plan functions intended for process based scheduling and execution of workflows.
The three-phase execution model:
- Prepare - Extract minimal context from workflow, build a
%Runnable{} - Execute - Run the node's work function in isolation
- Apply - Reduce results back into the workflow
See invoke_with_events/2 for a version that returns events produced by the invokation that can be
persisted incrementally as the workflow is executed for durable execution of long running workflows.
Examples
require Runic
alias Runic.Workflow
alias Runic.Workflow.Fact
step = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new() |> Workflow.add(step)
fact = Fact.new(value: 5)
workflow = Workflow.invoke(workflow, Workflow.root(), fact)
Workflow.is_runnable?(workflow)
# => true
Executes the Invokable protocol for a runnable step and fact and returns all newly caused events produced by the invokation.
This API is intended to enable durable execution of long running workflows by returning events that can be persisted elsewhere
so the workflow state can be rebuilt with from_log/1.
Examples
require Runic
alias Runic.Workflow
alias Runic.Workflow.Fact
step = Runic.step(fn x -> x * 2 end, name: :double)
workflow = Workflow.new() |> Workflow.add(step)
fact = Fact.new(value: 5)
{workflow, events} = Workflow.invoke_with_events(workflow, Workflow.root(), fact)
is_list(events)
# => true
Returns true if the workflow has pending work (runnable or matchable nodes).
Use this in scheduler loops to determine when to stop processing.
Example
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> Workflow.is_runnable?(workflow)
false
iex> workflow = Workflow.plan_eagerly(workflow, 5)
iex> Workflow.is_runnable?(workflow)
true
iex> workflow = Workflow.react(workflow)
iex> Workflow.is_runnable?(workflow)
false
Returns the complete event log combining build_log/1 and reaction events.
Deprecated in favor of event_log/1 for full snapshots, or
build_log/1 + workflow.uncommitted_events for incremental event-sourced
persistence.
Merges the second workflow into the first, maintaining the name of the first.
All root-level components from workflow2 are connected to the root of workflow,
making them siblings to the existing root components.
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> w1 = Runic.workflow(steps: [Runic.step(fn x -> x + 1 end, name: :add)])
iex> w2 = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end, name: :mult)])
iex> merged = Workflow.merge(w1, w2)
iex> merged |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions() |> Enum.sort()
[6, 10]Merging Other Types
Any value implementing the Runic.Transmutable protocol can be merged:
workflow = Workflow.merge(workflow, rule)
workflow = Workflow.merge(workflow, step)Use Cases
- Combining modular workflow fragments at runtime
- Building workflows dynamically from configuration
- Composing reusable workflow templates
Returns the list of components that a node depends on via :meta_ref edges.
This is useful for understanding what state a rule or step will read during execution, and for validation/visualization.
Example
deps = meta_dependencies(workflow, my_rule_condition)
# => [%Accumulator{name: :cart_state, ...}]
Returns the list of nodes that depend on a component via :meta_ref edges.
This is the inverse of meta_dependencies/2 - it shows what nodes will
read this component's state.
Example
dependents = meta_dependents(workflow, cart_accumulator)
# => [%Condition{...}, %Step{...}]
Creates an empty workflow with no components.
Example
iex> alias Runic.Workflow
iex> workflow = Workflow.new()
iex> workflow.__struct__
Runic.Workflow
Constructs a new Runic Workflow with the given name or parameters.
Examples
iex> alias Runic.Workflow
iex> workflow = Workflow.new(:my_workflow)
iex> workflow.name
:my_workflow
Returns a list of {node, fact} pairs ready for activation in the next cycle.
All runnables returned are independent and can be executed in parallel.
This is a low-level API for custom schedulers. For most use cases, prefer
prepare_for_dispatch/1 which returns fully prepared %Runnable{} structs.
Example
runnables = Workflow.next_runnables(workflow)
# => [{%Step{name: :add}, %Fact{value: 5}}, ...]
Returns the child steps connected via dataflow edges from a parent step.
Useful for traversing the workflow graph structure.
Example
require Runic
alias Runic.Workflow
workflow = Runic.workflow(steps: [
{Runic.step(fn x -> x + 1 end, name: :add),
[Runic.step(fn x -> x * 2 end, name: :double)]}
])
add_step = Workflow.get_component(workflow, :add)
[double_step] = Workflow.next_steps(workflow, add_step)
double_step.name # => :double
@spec pending_runnables(t()) :: [Runic.Workflow.RunnableDispatched.t()]
Identifies dispatched-but-not-completed runnables from the workflow's runnable events.
Returns a list of %RunnableDispatched{} events that have no corresponding
%RunnableCompleted{} or %RunnableFailed{} event. Useful for crash recovery
to find in-flight work that needs to be re-dispatched.
plan/1 will, for all next left hand side / match phase runnables activate and prepare next match runnables.
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> rule = Runic.rule(fn x when x > 0 -> :positive end, name: :pos)
iex> workflow = Runic.workflow(rules: [rule])
iex> workflow = Workflow.plan(workflow, 5)
iex> workflow = Workflow.plan(workflow)
iex> Workflow.is_runnable?(workflow)
true
For a new set of inputs, plan/2 prepares the workflow agenda for the next set of reactions by
matching through left-hand-side conditions in the workflow network.
For an inference engine's match -> select -> execute phase, this is the match phase.
Runic Workflow evaluation is forward chaining meaning from the root of the graph it starts by evaluating the direct children of the root node. If the workflow has any sort of conditions (from rules, etc) these conditions are prioritized in the agenda for the next cycle.
Plan will always match through a single level of nodes and identify the next runnable activations available.
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end, name: :double)])
iex> workflow = Workflow.plan(workflow, 5)
iex> Workflow.is_runnable?(workflow)
true
iex> workflow |> Workflow.react() |> Workflow.raw_productions()
[10]
Eagerly plans through all produced facts in the workflow that haven't yet activated subsequent runnables.
This is useful for after a workflow has already been ran and satisfied without runnables and you want to continue preparing reactions in the workflow from output facts.
Finds facts via :produced edges that don't have pending :runnable or :matchable edges.
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [
...> {Runic.step(fn x -> x + 1 end, name: :add),
...> [Runic.step(fn x -> x * 2 end, name: :double)]}
...> ])
iex> workflow = Workflow.react_until_satisfied(workflow, 5)
iex> Workflow.is_runnable?(workflow)
false
iex> workflow = Workflow.plan_eagerly(workflow)
iex> Workflow.is_runnable?(workflow)
true
Invokes all left hand side / match-phase runnables in the workflow for a given input fact until all are satisfied.
Upon calling plan_eagerly/2, the workflow will only have right hand side runnables left to execute that react or react_until_satisfied can execute.
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> rule = Runic.rule(fn x when x > 0 -> :positive end, name: :pos)
iex> workflow = Runic.workflow(rules: [rule])
iex> workflow = Workflow.plan_eagerly(workflow, 5)
iex> Workflow.is_runnable?(workflow)
true
iex> workflow |> Workflow.react() |> Workflow.raw_productions()
[:positive]
@spec prepare_for_dispatch(t()) :: {t(), [Runic.Workflow.Runnable.t()]}
Prepares all available runnables for external dispatch.
Returns {workflow, [%Runnable{}]} where the workflow may have been updated
by skip/defer reducers, and the runnables list contains nodes ready for execution.
This is designed for external schedulers that want to dispatch execution to worker pools or distributed systems.
Example
{workflow, runnables} = Workflow.prepare_for_dispatch(workflow)
# Dispatch to worker pool (can be parallel)
executed = Task.async_stream(runnables, fn r ->
Invokable.execute(r.node, r)
end, timeout: :infinity)
# Apply results back
workflow = Enum.reduce(executed, workflow, fn {:ok, r}, w ->
Workflow.apply_runnable(w, r)
end)
Prepares meta context for a node by traversing its :meta_ref edges.
Each :meta_ref edge has a getter_fn in its properties that extracts
the needed value from the workflow. This function executes all getter functions
and builds a map of context_key => value pairs.
Example
# For a Condition with state_of(:cart_accumulator) in its where clause
meta_context = prepare_meta_context(workflow, condition)
# => %{cart_accumulator_state: %{total: 150, items: [...]}}
@spec prepared_runnables(t()) :: [Runic.Workflow.Runnable.t()]
Returns a list of prepared %Runnable{} structs ready for execution.
This is the three-phase version of next_runnables/1. Each runnable contains
everything needed to execute independently of the workflow.
Three-Phase Execution Model
- Prepare - This function calls
Invokable.prepare/3for each pending node - Execute - Call
Invokable.execute/2on each runnable (can be parallelized) - Apply - Call
Workflow.apply_runnable(workflow, runnable)to fold events back
Returns
A list of %Runnable{} structs with status :pending, ready for execute/2.
Nodes that return {:skip, _} or {:defer, _} from prepare are handled immediately
and not included in the returned list.
Example
runnables = Workflow.prepared_runnables(workflow)
executed = Enum.map(runnables, &Invokable.execute(&1.node, &1))
workflow = Enum.reduce(executed, workflow, &Workflow.apply_runnable(&2, &1))
Returns all %Fact{} structs produced by the workflow.
Unlike raw_productions/1, this returns the full Fact structs including
ancestry information for causal tracing. Does not include input facts.
Example
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> [fact] = workflow |> Workflow.react(5) |> Workflow.productions()
iex> fact.value
10
Returns all productions of a component or sub component by name.
Many components are made up of sub components so this may return multiple facts for each part.
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end, name: :double)])
iex> [fact] = workflow |> Workflow.react(5) |> Workflow.productions(:double)
iex> fact.value
10
Returns all facts produced in the workflow so far by component name and sub component.
Returns a map where each key is the name of the component and the value is a list of facts produced by that component.
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [
...> Runic.step(fn x -> x + 1 end, name: :add),
...> Runic.step(fn x -> x * 2 end, name: :mult)
...> ])
iex> pbc = workflow |> Workflow.react(5) |> Workflow.productions_by_component()
iex> is_map(pbc)
true
iex> Map.keys(pbc) |> Enum.sort()
[:add, :mult]
Removes all %Fact{} vertices and generation integers from the workflow graph.
This clears the workflow's accumulated memory while preserving its structure (steps, rules, conditions, and flow edges). Useful for long-running workflows to free memory between processing batches.
Example
require Runic
alias Runic.Workflow
workflow = Runic.workflow(steps: [Runic.step(fn x -> x + 1 end)])
workflow = Workflow.react(workflow, 5)
# Facts exist after reaction
refute Enum.empty?(Workflow.facts(workflow))
# Purge clears them
workflow = Workflow.purge_memory(workflow)
assert Enum.empty?(Workflow.facts(workflow))
Merges the given context map into the workflow's run context.
Run context provides external, runtime-scoped values (secrets, tenant IDs,
database connections) to components during execution. Values are keyed by
component name for scoped access, with an optional :_global key for
values available to all components.
Run context is NOT part of the workflow's content hash, NOT serialized in the event log, and NOT visible in the fact graph.
Example
workflow = Workflow.put_run_context(workflow, %{
call_llm: %{api_key: "sk-..."},
_global: %{workspace_id: "ws1"}
})
Returns the raw values from all produced facts.
This is the most common way to extract results from a workflow.
Returns unwrapped values without the %Fact{} struct metadata.
Example
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> workflow |> Workflow.react(5) |> Workflow.raw_productions()
[10]By Component Name
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(
...> steps: [Runic.step(fn x -> x * 2 end, name: :double)]
...> )
iex> workflow |> Workflow.react(5) |> Workflow.raw_productions(:double)
[10]
Returns a map of component name to raw production values for all components.
Like raw_productions/1 but grouped by component name.
Example
require Runic
alias Runic.Workflow
workflow = Runic.workflow(
steps: [
{Runic.step(fn x -> x + 1 end, name: "step 1"),
[Runic.step(fn x -> x + 2 end, name: "step 2")]}
]
)
%{"step 1" => [2], "step 2" => [4]} =
workflow
|> Workflow.react_until_satisfied(1)
|> Workflow.raw_productions_by_component()
Returns raw (output value) side effects of the workflow - i.e. facts resulting from the execution of a Runic.Step
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> workflow |> Workflow.react(5) |> Workflow.raw_reactions() |> Enum.sort()
[5, 10]
Executes a single reaction cycle using the three-phase model.
This function advances the workflow by one "generation" - executing all currently
runnable steps/rules. Use react_until_satisfied/3 to run to completion.
Basic Usage
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> workflow = Workflow.react(workflow, 5)
iex> Workflow.raw_productions(workflow)
[10]Options
:async- Whentrue, executes runnables in parallel usingTask.async_stream. Useful for I/O-bound workflows. Default:false(serial execution):max_concurrency- Maximum parallel tasks whenasync: true. Default:System.schedulers_online():timeout- Timeout for each task whenasync: true. Default::infinity
Parallel Execution
workflow = Workflow.react(workflow, 5, async: true, max_concurrency: 4)
@spec react(t(), Runic.Workflow.Fact.t() | term(), keyword()) :: t()
Executes a single reaction cycle with the given input value.
Plans through the match phase and executes one cycle of runnables. Commonly used with a raw value to start workflow processing.
Options
:async- Whentrue, executes runnables in parallel. Default:false:max_concurrency- Maximum parallel tasks whenasync: true:timeout- Timeout for each task whenasync: true:run_context- A map of external values forcontext/1expressions. Seeput_run_context/2andreact_until_satisfied/3.
@spec react_until_satisfied(t(), Runic.Workflow.Fact.t() | term(), keyword()) :: t()
Executes the workflow until no more runnables remain.
Iteratively calls react/2 until all reachable nodes have been executed.
This is the recommended way to fully evaluate a workflow pipeline.
Basic Usage
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(
...> steps: [
...> {Runic.step(fn x -> x + 1 end, name: :add_one),
...> [Runic.step(fn x -> x * 2 end, name: :double)]}
...> ]
...> )
iex> results = workflow |> Workflow.react_until_satisfied(5) |> Workflow.raw_productions()
iex> Enum.sort(results)
[6, 12]Options
:async- Whentrue, executes runnables in parallel. Default:false:max_concurrency- Maximum parallel tasks whenasync: true:timeout- Timeout for each task whenasync: true:deadline_ms- Wall-clock deadline for the entire workflow execution in milliseconds. The policy driver will fail runnables with{:deadline_exceeded, remaining_ms}if the deadline is reached. Converted to an absolutedeadline_atmonotonic time internally.:checkpoint- A 1-arity function called after each react cycle with the updated workflow. Useful for persisting intermediate state in long-running durable workflows.:run_context- A map of external values keyed by component name, made available to components that usecontext/1expressions. Supports a:_globalkey for values available to all components. Seeput_run_context/2.
Warning
Workflows that don't terminate (e.g., hooks that continuously add steps) will
cause infinite loops. For non-terminating workflows, use react/2 in a
controlled loop with exit conditions.
Best For
- IEx/REPL experimentation
- Scripts and notebooks
- Testing workflows
- Simple batch processing
For production use with complex scheduling needs, consider prepare_for_dispatch/1
with a custom scheduler process.
@spec reactions(t()) :: [Runic.Workflow.Fact.t()]
Returns raw (output value) side effects of the workflow - i.e. facts resulting from the execution of a Runic.Step
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [Runic.step(fn x -> x * 2 end)])
iex> facts = workflow |> Workflow.react(5) |> Workflow.reactions()
iex> Enum.map(facts, & &1.value) |> Enum.sort()
[5, 10]
Removes a component and its owned invokable nodes from the workflow.
Invokable nodes that are shared with other components (due to content-addressable hashing) are preserved. Downstream nodes of the removed component are rewired to the removed component's upstream parents so the rest of the workflow stays connected.
Also removes any :connects_to edges involving the component and appends
a %ComponentRemoved{} event to the build log.
Example
require Runic
alias Runic.Workflow
step1 = Runic.step(fn x -> x + 1 end, name: :add)
step2 = Runic.step(fn x -> x * 2 end, name: :double)
step3 = Runic.step(fn x -> x - 1 end, name: :subtract)
workflow = Workflow.new()
|> Workflow.add(step1)
|> Workflow.add(step2, to: :add)
|> Workflow.add(step3, to: :double)
workflow = Workflow.remove_component(workflow, :double)
# :add now flows directly to :subtract
Returns a map of component names to their context key requirements.
Only includes components that use context/1 or context/2 meta expressions.
Components without context requirements are omitted.
Keys are annotated with whether they have defaults:
Example
Workflow.required_context_keys(workflow)
# => %{call_llm: [api_key: :required, model: {:optional, "gpt-4"}]}
Extracts structured results from a workflow.
When component_names is a list, extracts the last produced value for each
named component, independent of output port declarations.
When component_names is nil (default) and the workflow declares
output_ports, returns a map keyed by port name with values extracted
according to each port's :from binding and :cardinality.
When component_names is nil and no output_ports are declared, falls
back to raw_productions_by_component/1.
Options
:facts— whentrue, returns%Fact{}structs instead of raw values. Defaultfalse.:all— whentrue, returns all produced values as a list instead of just the last one, regardless of port cardinality. Defaultfalse.
Examples
# With output ports
workflow = Runic.workflow(
name: :pipeline,
steps: [{Runic.step(&parse/1, name: :parse),
[Runic.step(&price/1, name: :price)]}],
output_ports: [total: [type: :float, from: :price]]
)
%{total: 42.50} =
workflow
|> Workflow.react_until_satisfied(order)
|> Workflow.results()
# Explicit component selection
%{add: 6, mult: 10} =
workflow
|> Workflow.react_until_satisfied(5)
|> Workflow.results([:add, :mult])
# With options
%{price: [%Fact{}, %Fact{}]} =
Workflow.results(workflow, [:price], facts: true, all: true)
# Use output ports with options
%{total: [42.50, 43.00]} =
Workflow.results(workflow, nil, all: true)
@spec root_ancestor_hash(t(), Runic.Workflow.Fact.t()) :: integer() | nil
Finds the root ancestor fact hash for a given fact.
Walks the ancestry chain until it finds a fact with ancestry: nil (root input).
Returns the hash of that root fact, or the fact's own hash if it is a root.
Examples
iex> root_ancestor_hash(workflow, root_fact)
123456 # root_fact.hash
iex> root_ancestor_hash(workflow, deeply_nested_fact)
123456 # hash of the original root input
Replaces the workflow's scheduler policies list entirely.
Marks all nodes transitively downstream of failed_node as unreachable.
Walks the structural :flow edges from failed_node to find all transitive
dependents, then relabels any pending :runnable or :joined edges pointing
to those nodes as :upstream_failed. This prevents the workflow from getting
stuck waiting for work that can never complete due to a missing upstream fact.
Lists all %Step{} structs in the workflow.
Useful for introspecting workflow structure.
Example
iex> require Runic
iex> alias Runic.Workflow
iex> workflow = Runic.workflow(steps: [
...> Runic.step(fn x -> x + 1 end, name: :add),
...> Runic.step(fn x -> x * 2 end, name: :mult)
...> ])
iex> steps = Workflow.steps(workflow)
iex> length(steps)
2
iex> Enum.map(steps, & &1.name) |> Enum.sort()
[:add, :mult]
Returns a keyword list of sub-components of the given component by kind.
Examples
iex> require Runic
iex> alias Runic.Workflow
iex> rule = Runic.rule(fn x when x > 0 -> :positive end, name: :pos_check)
iex> workflow = Workflow.new() |> Workflow.add(rule)
iex> subs = Workflow.sub_components(workflow, :pos_check)
iex> Keyword.keys(subs) |> Enum.sort()
[:condition, :reaction]
Serializes the workflow to Cytoscape.js element JSON format.
Returns a list of node and edge elements compatible with Cytoscape.js and Kino.Cytoscape in Livebook.
Example
iex> elements = Workflow.to_cytoscape(workflow)
iex> Kino.Cytoscape.new(elements)
Serializes the workflow to DOT (Graphviz) format.
Returns a string that can be rendered with Graphviz tools.
Example
iex> dot = Workflow.to_dot(workflow)
iex> File.write!("workflow.dot", dot)
Serializes the workflow to an edgelist format.
Returns a list of {from, to, label} tuples by default.
Options
:format-:tuples(default) or:string:include_memory- Include causal edges (default:false)
Examples
iex> Workflow.to_edgelist(workflow)
[{:root, :step1, :flow}, {:step1, :step2, :flow}]
iex> Workflow.to_edgelist(workflow, format: :string)
"root -> step1 [flow]\nstep1 -> step2 [flow]"
Serializes the workflow to Mermaid flowchart format.
Returns a string that can be rendered by Mermaid.js.
Options
:direction- Flow direction::TB(default),:LR,:BT,:RL:include_memory- Include causal reaction edges (default:false):title- Optional title comment
Examples
iex> workflow |> Workflow.to_mermaid()
"flowchart TB\n ..."
iex> workflow |> Workflow.to_mermaid(direction: :LR, include_memory: true)
"flowchart LR\n ..."
Serializes causal reactions as a Mermaid sequence diagram.
Shows how facts flow through steps and produce new facts over time. Best used after workflow execution to visualize the causal chain.
Example
iex> workflow |> Workflow.plan_eagerly(input) |> Workflow.react() |> Workflow.to_mermaid_sequence()
"sequenceDiagram\n ..."
Validates that the given run_context satisfies all context/1 references
in the workflow.
Returns :ok if all required keys are present, or {:error, missing} with
a map of component names to their missing context keys. Keys with defaults
(from context/2) are not reported as missing.
Example
Workflow.validate_run_context(workflow, %{call_llm: %{api_key: "sk-..."}})
# => :ok
Workflow.validate_run_context(workflow, %{})
# => {:error, %{call_llm: [:api_key], db_query: [:repo, :tenant_id]}}