Runic.Workflow.Invokable protocol (Runic v0.1.0-alpha.7)

Copy Markdown View Source

Protocol defining how workflow nodes execute within a workflow context.

The Invokable protocol is the runtime heart of Runic. It defines how each node type (Step, Condition, Rule, Accumulator, etc.) executes within the context of a workflow, enabling extension of Runic's runtime with nodes that have different execution properties and evaluation semantics.

Three-Phase Execution Model

All workflow execution uses a three-phase model that enables parallel execution and external scheduler integration:

            
   PREPARE        EXECUTE         APPLY    
  (Phase 1)          (Phase 2)          (Phase 3)  
            
                                              
                                              
 Extract context      Run work fn         Reduce results
 from workflow        in isolation         into workflow
  %Runnable{}        (parallelizable)     (sequential)
  1. Prepare (prepare/3) - Extract minimal context from workflow into a %Runnable{} struct
  2. Execute (execute/2) - Run the node's work function in isolation (can be parallelized)
  3. Apply - Events on the Runnable are folded back into the workflow via apply_event/2

This separation enables:

  • Parallel execution of independent nodes (Phase 2 has no workflow access)
  • External scheduler integration via prepare_for_dispatch/1 and apply_runnable/2
  • Distributed execution by dispatching Runnables to remote workers (events are serializable)
  • Separation of concerns between pure computation and stateful workflow updates

Protocol Functions

FunctionPurpose
match_or_execute/1Declares whether node is a :match (predicate) or :execute (produces facts)
invoke/3High-level API that runs all three phases internally
prepare/3Phase 1: Extract context from workflow, build a %Runnable{}
execute/2Phase 2: Run the work function using only Runnable context

Return Types

  • prepare/3 returns:

    • {:ok, %Runnable{}} - Ready for execution
    • {:skip, (Workflow.t() -> Workflow.t())} - Skip with reducer function
    • {:defer, (Workflow.t() -> Workflow.t())} - Defer with reducer function
  • execute/2 returns a %Runnable{} with:

    • status: :completed - With result and events populated
    • status: :failed - With error populated
    • status: :skipped - With events for skip handling

Built-in Implementations

Runic provides Invokable implementations for all core node types:

Node TypeMatch/ExecuteDescription
Runic.Workflow.Root:matchEntry point for facts into the workflow
Runic.Workflow.Condition:matchBoolean predicate check
Runic.Workflow.Step:executeTransform input fact to output fact
Runic.Workflow.Conjunction:matchLogical AND of multiple conditions
Runic.Workflow.Accumulator:executeStateful reducer across invocations
Runic.Workflow.Join:executeWait for multiple parent facts before firing
Runic.Workflow.FanOut:executeSpread enumerable into parallel branches
Runic.Workflow.FanIn:executeCollect parallel results back together

External Scheduler Integration

The three-phase model enables integration with custom schedulers:

# Phase 1: Prepare runnables for dispatch
workflow = Workflow.plan_eagerly(workflow, input)
{workflow, runnables} = Workflow.prepare_for_dispatch(workflow)

# Phase 2: Execute (dispatch to worker pool, external service, etc.)
executed = Task.async_stream(runnables, fn runnable ->
  Runic.Workflow.Invokable.execute(runnable.node, runnable)
end, timeout: :infinity)

# Phase 3: Apply results back to workflow
workflow = Enum.reduce(executed, workflow, fn {:ok, runnable}, wrk ->
  Workflow.apply_runnable(wrk, runnable)
end)

Implementing Custom Invokable

To create a custom node type, implement the protocol:

defmodule MyApp.CustomNode do
  defstruct [:hash, :name, :work]
end

defimpl Runic.Workflow.Invokable, for: MyApp.CustomNode do
  alias Runic.Workflow
  alias Runic.Workflow.{Fact, Runnable, CausalContext}
  alias Runic.Workflow.Events.{FactProduced, ActivationConsumed}

  def match_or_execute(_node), do: :execute

  def invoke(%MyApp.CustomNode{} = node, workflow, fact) do
    result = node.work.(fact.value)
    result_fact = Fact.new(value: result, ancestry: {node.hash, fact.hash})

    workflow
    |> Workflow.log_fact(result_fact)
    |> Workflow.draw_connection(node, result_fact, :produced)
    |> Workflow.mark_runnable_as_ran(node, fact)
    |> Workflow.prepare_next_runnables(node, result_fact)
  end

  def prepare(%MyApp.CustomNode{} = node, workflow, fact) do
    context = CausalContext.new(
      node_hash: node.hash,
      input_fact: fact,
      ancestry_depth: Workflow.ancestry_depth(workflow, fact)
    )

    {:ok, Runnable.new(node, fact, context)}
  end

  def execute(%MyApp.CustomNode{} = node, %Runnable{input_fact: fact, context: ctx} = runnable) do
    result = node.work.(fact.value)
    result_fact = Fact.new(value: result, ancestry: {node.hash, fact.hash})

    events = [
      %FactProduced{
        hash: result_fact.hash,
        value: result_fact.value,
        ancestry: result_fact.ancestry,
        producer_label: :produced,
        weight: ctx.ancestry_depth + 1
      },
      %ActivationConsumed{
        fact_hash: fact.hash,
        node_hash: node.hash,
        from_label: :runnable
      }
    ]

    Runnable.complete(runnable, result_fact, events)
  end
end

See the Protocols Guide for more details and examples.

Summary

Types

t()

All the types that implement this protocol.

Functions

Phase 2: Execute a prepared runnable.

Legacy invoke function - activates a node in context of a workflow and input fact. Returns a new workflow with the node's effects applied.

Returns whether this node is a match (predicate/gate) or execute (produces facts) node.

Phase 1: Prepare a runnable for execution.

Types

t()

@type t() :: term()

All the types that implement this protocol.

Functions

execute(node, runnable)

@spec execute(node :: struct(), runnable :: Runic.Workflow.Runnable.t()) ::
  Runic.Workflow.Runnable.t()

Phase 2: Execute a prepared runnable.

Runs the node's work function using only the context captured in the Runnable. No workflow access is allowed during this phase (enables parallelization).

Returns the Runnable with:

  • status updated to :completed, :failed, or :skipped
  • result populated with the execution result
  • events populated with event structs to fold into the workflow

invoke(node, workflow, fact)

@spec invoke(
  node :: struct(),
  workflow :: Runic.Workflow.t(),
  fact :: Runic.Workflow.Fact.t()
) ::
  Runic.Workflow.t()

Legacy invoke function - activates a node in context of a workflow and input fact. Returns a new workflow with the node's effects applied.

During migration, this may delegate to the three-phase prepare/execute/apply cycle.

match_or_execute(node)

@spec match_or_execute(node :: struct()) :: :match | :execute

Returns whether this node is a match (predicate/gate) or execute (produces facts) node.

prepare(node, workflow, fact)

@spec prepare(
  node :: struct(),
  workflow :: Runic.Workflow.t(),
  fact :: Runic.Workflow.Fact.t()
) ::
  {:ok, Runic.Workflow.Runnable.t()}
  | {:skip, (Runic.Workflow.t() -> Runic.Workflow.t())}
  | {:defer, (Runic.Workflow.t() -> Runic.Workflow.t())}

Phase 1: Prepare a runnable for execution.

Extracts minimal context from the workflow needed to execute this node. Returns a %Runnable{} struct that can be executed independently of the workflow.

Returns

  • {:ok, %Runnable{}} - Node is ready for execution
  • {:skip, reducer_fn} - Node should be skipped, apply reducer to workflow
  • {:defer, reducer_fn} - Node should be deferred, apply reducer to workflow