This guide covers persisting workflow state, recovering from crashes, capturing runnable lifecycle events, and checkpointing long-running workflows. It builds directly on the concepts from the Building a Workflow Scheduler guide.
Prerequisites: Familiarity with the three-phase execution model (prepare → execute → apply) and building schedulers covered in the scheduling guide. Basic knowledge of
Runic.Workflow.SchedulerPolicyandRunic.Workflow.PolicyDriver.
Why Durable Execution?
Runic workflows are in-memory data structures. When the process running a workflow crashes — or the VM restarts — all progress is lost. For short-lived computations this is fine. But many real workflows are long-running, involve unreliable external services, or must survive infrastructure failures:
- LLM pipelines where each API call takes seconds and costs money
- Order fulfillment workflows spanning minutes of I/O across multiple services
- ETL pipelines processing millions of records over hours
- Approval workflows that pause for days waiting on human input
Durable execution means: the workflow can be stopped, crashed, or restarted and resume exactly where it left off.
Runic achieves this through three mechanisms:
- Event sourcing —
Workflow.log/1captures the full history of a workflow as a list of events that can rebuild it - Scheduler policies — per-node retry, timeout, and failure handling declarative configuration
- The Runner — supervised infrastructure that combines dispatch, persistence, and crash recovery
Event Sourcing: log/1 and from_log/1
Every Runic workflow maintains a build log — a list of events that describe how the workflow was constructed and what it has produced. This log is a complete record, and you can reconstruct a workflow from it:
require Runic
alias Runic.Workflow
# Build and run a workflow
workflow =
Runic.workflow(name: :example, steps: [
Runic.step(fn x -> x * 2 end, name: :double),
Runic.step(fn x -> x + 1 end, name: :increment)
])
|> Workflow.react_until_satisfied(5)
# Capture the full event log
log = Workflow.log(workflow)
# Rebuild the workflow from the log — same graph, same produced facts
restored = Workflow.from_log(log)
# Productions are preserved
Workflow.raw_productions(restored) == Workflow.raw_productions(workflow)
# => trueThe log contains two kinds of events:
- Build events —
%ComponentAdded{}, edges, and structural changes that define the workflow graph - Reaction events —
%ReactionOccurred{}recording which nodes produced which facts
Together these are sufficient to reconstruct both the workflow's structure and its execution state.
Runnable Lifecycle Events
When scheduler policies are used with execution_mode: :durable, three additional event types are captured:
%RunnableDispatched{}— records that a runnable was sent for execution, including the resolved policy and attempt number%RunnableCompleted{}— records successful completion with duration and the produced fact%RunnableFailed{}— records permanent failure (retries exhausted) with the error and failure action
These events enable crash recovery: by comparing dispatched events against completed/failed events, you can identify in-flight work that was interrupted.
alias Runic.Workflow.SchedulerPolicy
alias Runic.Workflow.PolicyDriver
# Build a policy that emits lifecycle events
policy = SchedulerPolicy.new(%{
max_retries: 2,
backoff: :exponential,
timeout_ms: 5_000,
execution_mode: :durable
})
# Execute with event emission
{executed_runnable, events} = PolicyDriver.execute(runnable, policy, emit_events: true)
# events is a list of %RunnableDispatched{} and %RunnableCompleted{} or %RunnableFailed{}Append these events to the workflow for persistence:
workflow = Workflow.append_runnable_events(workflow, events)
# They're included in the log
log = Workflow.log(workflow)
# And survive round-trip reconstruction
restored = Workflow.from_log(log)Identifying In-Flight Work
After a crash, use Workflow.pending_runnables/1 to find dispatched-but-not-completed work:
# After restoring from a log
restored = Workflow.from_log(log)
pending = Workflow.pending_runnables(restored)
# => [%RunnableDispatched{node_name: :call_external_api, attempt: 1, ...}]This returns %RunnableDispatched{} events that have no corresponding %RunnableCompleted{} or %RunnableFailed{} event — work that was in progress when the process died.
Scheduler Policies for Resilient Execution
Before diving into the Runner, let's see how scheduler policies make individual steps resilient. Policies wrap the execute phase with retries, timeouts, and fallbacks — all without changing the workflow graph or the Invokable protocol.
Attaching Policies to a Workflow
Policies are a list of {matcher, policy_map} rules stored on the workflow. During execution, each runnable is matched top-to-bottom; the first match wins and its policy map is merged over the defaults:
require Runic
alias Runic.Workflow
workflow =
Runic.workflow(name: :resilient_pipeline, steps: [
Runic.step(fn order -> call_inventory_api(order) end, name: :check_inventory),
Runic.step(fn order -> call_fraud_api(order) end, name: :screen_fraud),
Runic.step(fn order -> compute_shipping(order) end, name: :estimate_shipping)
])
# Add policies — higher priority rules first
workflow =
workflow
# External API calls get retries with exponential backoff
|> Workflow.add_scheduler_policy(:check_inventory, %{
max_retries: 3,
backoff: :exponential,
base_delay_ms: 1_000,
timeout_ms: 10_000
})
|> Workflow.add_scheduler_policy(:screen_fraud, %{
max_retries: 2,
backoff: :linear,
timeout_ms: 15_000,
on_failure: :skip # fraud check failure shouldn't block the order
})
# Local computation — fast fail, no retries
|> Workflow.append_scheduler_policy(:estimate_shipping, %{
max_retries: 0,
timeout_ms: 1_000
})Using Policy Presets
SchedulerPolicy includes presets for common patterns:
alias Runic.Workflow.SchedulerPolicy
# For LLM / external AI calls: 3 retries, exponential backoff, 30s timeout
llm = SchedulerPolicy.llm_policy()
# For I/O operations: 2 retries, linear backoff, 10s timeout, skip on failure
io = SchedulerPolicy.io_policy()
# No retries, 5s timeout, halt on failure
fast = SchedulerPolicy.fast_fail()
# Use preset values as policy maps in workflow rules
workflow =
workflow
|> Workflow.add_scheduler_policy(:call_llm, Map.from_struct(llm))
|> Workflow.add_scheduler_policy({:type, Runic.Workflow.Step}, Map.from_struct(io))Matchers
Matchers determine which policy applies to which node:
# Exact name match
{:check_inventory, %{max_retries: 3}}
# Regex on node name — matches :llm_classify, :llm_summarize, etc.
{{:name, ~r/^llm_/}, %{max_retries: 3, backoff: :exponential}}
# Type match — all Steps get this policy
{{:type, Runic.Workflow.Step}, %{timeout_ms: 10_000}}
# Type match — Steps and Rules
{{:type, [Runic.Workflow.Step, Runic.Workflow.Rule]}, %{timeout_ms: 5_000}}
# Custom predicate
{fn node -> Map.get(node, :name) in [:api_a, :api_b] end, %{max_retries: 2}}
# Catch-all default (should be last)
{:default, %{timeout_ms: 30_000}}Runtime Policy Overrides
Policies can be overridden at execution time without modifying the workflow:
# Prepend runtime overrides (higher priority than workflow policies)
Workflow.react_until_satisfied(workflow, input,
scheduler_policies: [
{:check_inventory, %{max_retries: 5, timeout_ms: 30_000}}
]
)
# Replace workflow policies entirely
Workflow.react_until_satisfied(workflow, input,
scheduler_policies: [{:default, %{max_retries: 0}}],
scheduler_policies_mode: :replace
)Fallbacks
When all retries are exhausted, a fallback function provides a last-resort recovery:
workflow =
workflow
|> Workflow.add_scheduler_policy(:call_llm, %{
max_retries: 3,
backoff: :exponential,
timeout_ms: 30_000,
fallback: fn _runnable, _error ->
# Return a synthetic value — the workflow continues with this output
{:value, %{response: "Service unavailable", fallback: true}}
end
})Fallbacks receive (runnable, error) and can return:
{:value, term}— synthetic output, workflow continues as if the step succeeded{:retry_with, %{key: val}}— merge overrides intometa_contextand try once more%Runnable{}— a modified runnable to execute once (e.g., with different input)
Deadlines
For time-bounded workflow execution, use :deadline_ms:
# Entire workflow must complete within 60 seconds
Workflow.react_until_satisfied(workflow, input,
deadline_ms: 60_000,
scheduler_policies: [
{:default, %{max_retries: 2, backoff: :exponential}}
]
)The deadline is converted to an absolute monotonic timestamp and checked before each retry attempt and used to cap per-step timeouts. Steps fail with {:deadline_exceeded, remaining_ms} when the deadline is reached.
Checkpointing During Execution
For long-running workflows, persist intermediate state with the :checkpoint callback:
Workflow.react_until_satisfied(workflow, input,
checkpoint: fn workflow ->
log = Workflow.log(workflow)
MyApp.Repo.save_workflow_log(:my_workflow, log)
end
)The checkpoint fires after each react cycle — after all runnables in a generation execute and their results are applied. If the process crashes mid-execution, load the last checkpoint and resume:
{:ok, log} = MyApp.Repo.load_workflow_log(:my_workflow)
workflow = Workflow.from_log(log)
# Continue from where we left off
workflow |> Workflow.react_until_satisfied()The Runner: Managed Durable Execution
While scheduler policies and manual checkpointing work for programmatic use, the Runic.Runner provides a complete managed execution environment:
- Supervision — each workflow runs in its own
GenServerunder aDynamicSupervisor - Task isolation — runnables dispatch to
Task.Supervisor.async_nolinkfor fault isolation - Registry — workflows are addressable by ID via Elixir's
Registry - Persistence — pluggable
Storebehaviour with built-in ETS and Mnesia adapters - Policy integration — automatic resolution and execution through
PolicyDriver - Crash recovery —
resume/3rebuilds from persisted logs and re-dispatches in-flight work - Telemetry — lifecycle events under
[:runic, :runner, ...]for observability
Starting a Runner
Add the Runner to your application's supervision tree:
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
children = [
{Runic.Runner, name: MyApp.Runner}
]
Supervisor.start_link(children, strategy: :one_for_one)
end
endThe Runner starts a supervision tree internally:
MyApp.Runner (Supervisor, :rest_for_one)
├── MyApp.Runner.Store (ETS table owner GenServer)
├── MyApp.Runner.Registry (Elixir Registry, :unique)
├── MyApp.Runner.TaskSupervisor (Task.Supervisor)
└── MyApp.Runner.WorkerSupervisor (DynamicSupervisor)
├── Worker<order-123>
├── Worker<order-456>
└── ...Running Workflows
require Runic
alias Runic.Workflow
# Build the workflow with policies
workflow =
Runic.workflow(name: :order_fulfillment, steps: [
Runic.step(&MyApp.Orders.validate/1, name: :validate),
{Runic.step(&MyApp.Inventory.check/1, name: :check_inventory), [
Runic.step(&MyApp.Fraud.screen/1, name: :screen_fraud),
Runic.step(&MyApp.Shipping.estimate/1, name: :estimate_shipping)
]}
])
|> Workflow.add_scheduler_policy(:check_inventory, %{
max_retries: 3,
backoff: :exponential,
timeout_ms: 10_000,
execution_mode: :durable
})
|> Workflow.add_scheduler_policy(:screen_fraud, %{
max_retries: 2,
timeout_ms: 15_000,
on_failure: :skip,
execution_mode: :durable
})
|> Workflow.append_scheduler_policy(:default, %{timeout_ms: 5_000})
# Start a managed workflow
{:ok, _pid} = Runic.Runner.start_workflow(
MyApp.Runner,
"order-#{order.id}",
workflow,
max_concurrency: 4,
checkpoint_strategy: :every_cycle,
on_complete: fn id, workflow ->
results = Workflow.raw_productions(workflow)
MyApp.Orders.finalize(id, results)
end
)
# Feed input — the Worker handles the full dispatch loop
:ok = Runic.Runner.run(MyApp.Runner, "order-#{order.id}", order)
# Query results (non-blocking — returns current state)
{:ok, results} = Runic.Runner.get_results(MyApp.Runner, "order-#{order.id}")Checkpoint Strategies
The Worker supports multiple checkpointing strategies to balance durability against performance:
| Strategy | Behavior | Use Case |
|---|---|---|
:every_cycle | Persist after each react cycle (default) | Maximum durability, moderate overhead |
:on_complete | Persist only when workflow satisfies | Fast execution, risk of losing in-progress work |
{:every_n, n} | Persist every Nth completed runnable | Tunable balance between durability and throughput |
:manual | Only persist on explicit checkpoint/2 call | Full user control |
# Checkpoint every 5th completed runnable
Runic.Runner.start_workflow(MyApp.Runner, :etl_pipeline, workflow,
checkpoint_strategy: {:every_n, 5}
)
# Manual checkpointing
Runic.Runner.start_workflow(MyApp.Runner, :approval_flow, workflow,
checkpoint_strategy: :manual
)
# Later, explicitly checkpoint
Runic.Runner.checkpoint(MyApp.Runner, :approval_flow)Crash Recovery with resume/3
When a Worker crashes (or the VM restarts), the Runner can resume from persisted state:
# The original Worker crashed or the VM restarted...
# Resume loads the log from the store, rebuilds the workflow, and starts a new Worker
{:ok, _pid} = Runic.Runner.resume(MyApp.Runner, "order-123")
# If the workflow had durable-mode policies, pending_runnables are re-dispatched automaticallyUnder the hood, resume/3:
- Calls
Store.load/2to retrieve the persisted event log - Calls
Workflow.from_log/1to rebuild the workflow graph and state - Starts a new Worker with the restored workflow
- The Worker calls
Workflow.pending_runnables/1to find in-flight work - Re-plans and re-dispatches any pending runnables
This is why execution_mode: :durable matters: without it, the lifecycle events that track dispatched-vs-completed work aren't captured, and in-flight recovery isn't possible.
Store Adapters
The Runner's persistence is abstracted behind the Runic.Runner.Store behaviour:
defmodule Runic.Runner.Store do
@callback init_store(opts :: keyword()) :: {:ok, state()} | {:error, term()}
@callback save(workflow_id(), log(), state()) :: :ok | {:error, term()}
@callback load(workflow_id(), state()) :: {:ok, log()} | {:error, :not_found | term()}
@callback checkpoint(workflow_id(), log(), state()) :: :ok | {:error, term()}
@callback delete(workflow_id(), state()) :: :ok | {:error, term()}
@callback list(state()) :: {:ok, [workflow_id()]} | {:error, term()}
@callback exists?(workflow_id(), state()) :: boolean()
endcheckpoint/3, delete/2, list/1, and exists?/2 are optional callbacks.
ETS Adapter (Default)
Runic.Runner.Store.ETS stores workflow logs in a public ETS table owned by a GenServer. Logs survive Worker restarts within the same VM but are lost on VM restart. Zero configuration needed:
# ETS is the default — just start the Runner
{:ok, _} = Runic.Runner.start_link(name: MyApp.Runner)If you configure ETS explicitly, supervise it before the Runner:
children = [
{Runic.Runner.Store.ETS, runner_name: MyApp.Runner},
{Runic.Runner, name: MyApp.Runner, store: Runic.Runner.Store.ETS}
]Mnesia Adapter
Runic.Runner.Store.Mnesia uses OTP's built-in Mnesia database for disk persistence and optional distributed storage:
children = [
{Runic.Runner.Store.Mnesia, runner_name: MyApp.Runner, disc_copies: true},
{Runic.Runner,
name: MyApp.Runner,
store: Runic.Runner.Store.Mnesia,
store_opts: [disc_copies: true]}
]Mnesia tables persist across VM restarts. For distributed clusters, pass :nodes:
store_opts: [disc_copies: true, nodes: [:"app@node1", :"app@node2"]]Writing a Custom Adapter
Implement the Runic.Runner.Store behaviour. The Runner only auto-starts its implicit default ETS store. If you configure any store explicitly, the Runner assumes its lifecycle is managed elsewhere. That means adapters that own runtime resources themselves should provide start_link/1 and child_spec/1 so you can add them to your supervision tree, while adapters backed by an externally supervised service such as an Ecto repo can remain callback-only.
defmodule MyApp.PostgresStore do
@behaviour Runic.Runner.Store
@impl true
def init_store(opts) do
repo = Keyword.fetch!(opts, :repo)
{:ok, %{repo: repo}}
end
@impl true
def save(workflow_id, log, %{repo: repo}) do
serialized = :erlang.term_to_binary(log)
repo.insert_or_update!(
%MyApp.WorkflowLog{id: workflow_id, data: serialized}
)
:ok
end
@impl true
def load(workflow_id, %{repo: repo}) do
case repo.get(MyApp.WorkflowLog, workflow_id) do
nil -> {:error, :not_found}
record -> {:ok, :erlang.binary_to_term(record.data)}
end
end
endIf the adapter should be supervised in your application tree, add lifecycle callbacks:
defmodule MyApp.PostgresStore do
@behaviour Runic.Runner.Store
# ...init_store/1, save/3, load/2...
# start_link/1 and child_spec/1 for the Runner's supervision tree
def start_link(opts), do: Agent.start_link(fn -> opts end, name: __MODULE__)
def child_spec(opts), do: %{id: __MODULE__, start: {__MODULE__, :start_link, [opts]}}
endThen start the adapter before the Runner:
children = [
{MyApp.PostgresStore, runner_name: MyApp.Runner, repo: MyApp.Repo},
{Runic.Runner,
name: MyApp.Runner,
store: MyApp.PostgresStore,
store_opts: [repo: MyApp.Repo]}
]Telemetry
The Runner emits telemetry events at key lifecycle points:
| Event | When |
|---|---|
[:runic, :runner, :workflow, :start] | Workflow Worker initialized |
[:runic, :runner, :workflow, :stop] | Workflow satisfied (all runnables exhausted) |
[:runic, :runner, :runnable, :start] | Runnable dispatched to a task |
[:runic, :runner, :runnable, :stop] | Runnable completed (includes duration measurement) |
[:runic, :runner, :runnable, :exception] | Runnable failed permanently |
[:runic, :runner, :store, :start] | Store operation started |
[:runic, :runner, :store, :stop] | Store operation completed |
# Attach a handler for monitoring
:telemetry.attach_many(
"workflow-logger",
Runic.Runner.Telemetry.event_names(),
fn event, measurements, metadata, _config ->
IO.inspect({event, measurements, metadata}, label: "telemetry")
end,
nil
)Putting It All Together: A Durable LLM Pipeline
Here's a complete example combining policies, durable execution, the Runner, and crash recovery for an LLM-powered document processing pipeline:
defmodule MyApp.DocumentPipeline do
require Runic
alias Runic.Workflow
def build_workflow do
extract = Runic.step(
fn doc -> MyApp.LLM.extract_entities(doc) end,
name: :extract_entities
)
classify = Runic.step(
fn doc -> MyApp.LLM.classify(doc) end,
name: :classify_document
)
summarize = Runic.step(
fn doc -> MyApp.LLM.summarize(doc) end,
name: :summarize
)
store_results = Runic.step(
fn entities, classification, summary ->
MyApp.Documents.store(%{
entities: entities,
classification: classification,
summary: summary
})
end,
name: :store_results
)
Runic.workflow(name: :doc_pipeline)
|> Workflow.add(extract)
|> Workflow.add(classify)
|> Workflow.add(summarize)
|> Workflow.add(store_results, to: [:extract_entities, :classify_document, :summarize])
# LLM calls: retry with exponential backoff, durable for crash recovery
|> Workflow.add_scheduler_policy({:name, ~r/^(extract|classify|summarize)/}, %{
max_retries: 3,
backoff: :exponential,
base_delay_ms: 2_000,
max_delay_ms: 30_000,
timeout_ms: 60_000,
execution_mode: :durable,
fallback: fn _runnable, error ->
{:value, %{error: inspect(error), fallback: true}}
end
})
# Database write: fast fail
|> Workflow.append_scheduler_policy(:store_results, %{
max_retries: 1,
timeout_ms: 5_000
})
end
def process_document(runner, doc_id, document) do
workflow = build_workflow()
{:ok, _pid} = Runic.Runner.start_workflow(
runner,
"doc-#{doc_id}",
workflow,
max_concurrency: 3,
checkpoint_strategy: :every_cycle,
on_complete: {MyApp.Notifications, :document_processed, []}
)
Runic.Runner.run(runner, "doc-#{doc_id}", document)
end
def retry_failed(runner, doc_id) do
# Resume from the last checkpoint — in-flight LLM calls are re-dispatched
Runic.Runner.resume(runner, "doc-#{doc_id}")
end
endThe three LLM steps (extract_entities, classify_document, summarize) run concurrently with fault isolation. If the process crashes mid-execution:
- The ETS store (or Mnesia) has the last checkpointed log
resume/3rebuilds the workflow from the logpending_runnables/1identifies any LLM calls that were dispatched but never completed- The Worker re-dispatches them with the same policies
The fallback ensures that even if an LLM call permanently fails after all retries, the pipeline continues with a fallback value rather than blocking the entire workflow.
What's Next
This guide covered the full spectrum of durable execution in Runic. For reference:
Runic.Workflow.SchedulerPolicy— policy struct, matchers, presets, and resolutionRunic.Workflow.PolicyDriver— the execution driver handling retries, timeouts, and fallbacksRunic.Runner— the supervised execution infrastructureRunic.Runner.Store— the persistence behaviour and built-in adaptersRunic.Runner.Telemetry— telemetry event catalog