Durable Workflows
View Source< LiveView Integration | Up: Patterns & Recipes | Index | Data Pipelines >
EffectLogger turns computations into durable workflows that survive process restarts, server reboots, and code deployments. This is Temporal-style durable execution as a library primitive.
The pattern
- Write a computation that yields at interaction/wait points
- Run with
EffectLogger.with_logging()to capture the effect log - When the computation suspends, serialize and persist the log
- Later, cold-resume from the persisted log with a new value
Basic workflow: approval flow
defmodule ApprovalWorkflow do
use Skuld.Syntax
defcomp run(request) do
# Step 1: validate the request
validated <- validate(request)
# Step 2: yield for manager approval
approval <- Yield.yield(%{
type: :approval_needed,
request: validated,
approver: validated.manager_id
})
case approval do
:approved ->
# Step 3: execute the approved action
result <- execute(validated)
_ <- EventAccumulator.emit(%RequestApproved{
request_id: validated.id
})
{:ok, result}
:rejected ->
_ <- EventAccumulator.emit(%RequestRejected{
request_id: validated.id
})
{:rejected, validated.id}
end
end
endStarting the workflow
alias Skuld.Effects.EffectLogger
alias Skuld.Effects.EffectLogger.Log
{suspended, env} =
ApprovalWorkflow.run(request)
|> EffectLogger.with_logging()
|> Yield.with_handler()
|> State.with_handler(initial_state)
|> EventAccumulator.with_handler(output: fn r, e -> {r, e} end)
|> Throw.with_handler()
|> Comp.run()
# suspended.value is %{type: :approval_needed, ...}
# Persist the log
log = EffectLogger.get_log(env) |> Log.finalize()
json = Jason.encode!(log)
Repo.insert!(%WorkflowState{
workflow_id: request.id,
log: json,
status: :awaiting_approval
})At this point the finalized log contains a flat list of every effect
that fired, in execution order. Each entry records the effect signature,
its arguments, and the result value (if the effect completed). Here is
what Log.to_list(log) looks like when the workflow suspends at the
Yield:
[
# Auto-inserted root checkpoint (captures initial handler state)
%EffectLogEntry{sig: EffectLogger, data: %MarkLoop{loop_id: :__root__, env_state: ...},
value: :ok, state: :executed},
# validate(request) — Port call into the validation module
%EffectLogEntry{sig: Port, data: {Port.Request, Validator, :validate, [request]},
value: validated, state: :executed},
# Yield.yield(...) — computation suspended here, waiting for approval
%EffectLogEntry{sig: Yield, data: {Yield.Yield, %{type: :approval_needed, ...}},
value: nil, state: :started}
# ^^^ ^^^^^^^^
# no result yet still in-progress
]Key things to notice:
:executedentries have avalue— during replay these are short-circuited (the logged value is returned without re-running the effect).:startedmarks the suspension point — this is wherewith_resumeinjects the resume value.- Pure computations (plain function calls, pattern matches) don't appear in the log — only effects do.
Resuming after approval
# Load the persisted log
workflow = Repo.get!(WorkflowState, request_id)
cold_log = workflow.log |> Jason.decode!() |> Log.from_json()
# Resume with the approval decision
{result, env} =
ApprovalWorkflow.run(request) # same source code
|> EffectLogger.with_resume(cold_log, :approved)
|> Yield.with_handler()
|> State.with_handler(nil) # ignored - restored from log
|> EventAccumulator.with_handler(output: fn r, e -> {r, e} end)
|> Throw.with_handler()
|> Comp.run()
# result is {:ok, execution_result}During resume the computation replays from the beginning, but
:executed entries are short-circuited — their logged values are
returned instantly without calling the real handlers. At the :started
Yield entry, the resume value (:approved) is injected and execution
continues live from that point. The final log after completion:
[
# — replayed from log (short-circuited) —
%EffectLogEntry{sig: EffectLogger, data: %MarkLoop{loop_id: :__root__, ...},
value: :ok, state: :executed},
%EffectLogEntry{sig: Port, data: {Port.Request, Validator, :validate, [request]},
value: validated, state: :executed},
# — resume value injected here —
%EffectLogEntry{sig: Yield, data: {Yield.Yield, %{type: :approval_needed, ...}},
value: :approved, state: :executed},
# ^^^^^^^^^^^^^^^^
# now :executed with the resume value
# — live execution continues —
%EffectLogEntry{sig: Port, data: {Port.Request, Actions, :execute, [validated]},
value: exec_result, state: :executed},
%EffectLogEntry{sig: Writer, data: {Writer.Tell, %RequestApproved{request_id: 42}},
value: [...], state: :executed}
]Long-running loops: LLM conversations
For workflows with multiple interaction cycles, use mark_loop to
keep the log bounded:
defmodule ConversationWorkflow do
use Skuld.Syntax
defcomp run() do
_ <- EffectLogger.mark_loop(ConversationLoop)
history <- State.get()
# Yield for user input
user_msg <- Yield.yield(%{
type: :user_input,
history: history
})
# Call LLM (via Port)
response <- LLM.chat!(history ++ [%{role: :user, content: user_msg}])
# Update conversation history
_ <- State.put(history ++ [
%{role: :user, content: user_msg},
%{role: :assistant, content: response}
])
# Check if conversation should end
case response do
%{done: true} -> {:done, response}
_ -> run() # loop for next turn
end
end
endEach cycle: suspend -> serialize -> persist -> (user responds) ->
deserialize -> resume -> next cycle. The mark_loop keeps the log
O(current iteration) regardless of conversation length.
After several turns the log is pruned down to just the most recent iteration. Here is the log after the second turn suspends at the Yield (with loop pruning applied):
[
# Root checkpoint (always preserved)
%EffectLogEntry{sig: EffectLogger, data: %MarkLoop{loop_id: :__root__, ...},
value: :ok, state: :executed},
# Most recent loop mark — earlier iterations were pruned away
%EffectLogEntry{sig: EffectLogger,
data: %MarkLoop{loop_id: ConversationLoop, env_state: snapshot},
value: :ok, state: :executed},
# env_state captures handler state at this point,
# so cold resume can restore State, Writer, etc.
# Current iteration's effects
%EffectLogEntry{sig: State, data: State.Get,
value: [%{role: :user, ...}, %{role: :assistant, ...}],
state: :executed},
%EffectLogEntry{sig: Yield, data: {Yield.Yield, %{type: :user_input, ...}},
value: nil, state: :started}
# ^^^^^^^^ suspended, waiting for next input
]Without mark_loop, the log would grow with every turn. With it,
completed iterations are pruned and the env_state snapshot in the
mark entry preserves enough state for cold resume to restore handlers
correctly.
Surviving deployments
When code changes between suspend and resume, EffectLogger can handle divergence:
|> EffectLogger.with_resume(cold_log, value, allow_divergence: true)With allow_divergence:
- Completed effects replay from logged values (fast-forward)
- If the code path diverges from the log (new effects, removed effects), execution continues fresh from the divergence point
- Failed/discarded effects re-execute
This means you can deploy bug fixes and the workflow picks up from where it left off, re-executing any changed logic.
Persistence strategies
Database
# Store as JSON in a text/jsonb column
Repo.insert!(%Workflow{
id: workflow_id,
log: Jason.encode!(Log.finalize(log)),
status: :suspended,
suspended_at: DateTime.utc_now()
})
# Resume
workflow = Repo.get!(Workflow, workflow_id)
cold_log = workflow.log |> Jason.decode!() |> Log.from_json()Message queue
# Publish suspended workflow for async processing
Broadway.produce(Jason.encode!(%{
workflow_id: id,
log: Log.finalize(log),
resume_value: value
}))File system (development)
File.write!("workflows/#{id}.json", Jason.encode!(Log.finalize(log)))
cold_log = "workflows/#{id}.json"
|> File.read!()
|> Jason.decode!()
|> Log.from_json()Comparison with Temporal.io
| Aspect | Skuld EffectLogger | Temporal.io |
|---|---|---|
| Infrastructure | Library (no server) | Server cluster required |
| Language | Elixir only | Multi-language SDKs |
| Serialization | JSON log of effects | Protobuf event history |
| Resume mechanism | Source replay + log | Worker polling + replay |
| Activities | Effects (Port, Transaction, etc.) | RPC-dispatched activities |
| Composition | Algebraic effect stacking | Workflow/activity split |
| Deployment | Your app process | Separate service |
Skuld is much lighter-weight: no infrastructure, no RPC, and full algebraic effect composition. Temporal is more mature for production distributed workflows with built-in retry policies, visibility, and multi-language support.
< LiveView Integration | Up: Patterns & Recipes | Index | Data Pipelines >