Mix.install(
[
{:jido_composer, ">= 0.0.0"},
{:kino, "~> 0.14"}
],
config: [
jido_action: [default_timeout: :timer.minutes(5)]
]
)Introduction
Real-world pipelines often need human oversight. Jido Composer supports this through generalized suspension — a workflow can pause for any reason (human approval, rate limits, external jobs) and resume later.
This guide covers:
- HumanNode — A node that suspends the workflow for human input
- Suspend/Resume — The directive-based suspension lifecycle
- Checkpoint/Thaw — Serializing suspended state across process restarts
Shared Setup
alias Jido.Agent.Directive
alias Jido.Agent.Strategy.State, as: StratState
alias Jido.Composer.{Context, Suspension, Checkpoint, Resume}
alias Jido.Composer.Directive.Suspend, as: SuspendDirective
alias Jido.Composer.HITL.{ApprovalRequest, ApprovalResponse}
alias Jido.Composer.Node.HumanNode
defmodule Demo.AccumulatorAction do
use Jido.Action,
name: "accumulator",
description: "Stores a tag value",
schema: [tag: [type: :string, required: true, doc: "Tag to append"]]
def run(%{tag: tag}, _ctx), do: {:ok, %{tag: tag}}
end
defmodule Demo.NoopAction do
use Jido.Action,
name: "noop",
description: "Does nothing",
schema: []
def run(_params, _ctx), do: {:ok, %{}}
end
defmodule Demo.Helpers do
defmacro suppress_agent_doctests do
quote do
@doc false
def plugins, do: super()
@doc false
def capabilities, do: super()
@doc false
def signal_types, do: super()
end
end
end
# DirectiveRunner: executes directives synchronously for demo purposes.
defmodule Demo.DirectiveRunner do
def run_until_done(module, agent, directives) do
run_loop(module, agent, directives)
end
defp run_loop(_module, agent, []), do: check_terminal(agent)
defp run_loop(module, agent, [directive | rest]) do
case directive do
%Directive.RunInstruction{instruction: instr, result_action: result_action} ->
payload =
case Jido.Exec.run(instr.action, instr.params, %{}, timeout: 0) do
{:ok, result} -> %{status: :ok, result: result}
{:ok, result, outcome} -> %{status: :ok, result: result, outcome: outcome}
{:error, reason} -> %{status: :error, result: %{error: reason}}
end
{agent, new_directives} = module.cmd(agent, {result_action, payload})
run_loop(module, agent, new_directives ++ rest)
%SuspendDirective{} = suspend ->
{:suspend, agent, suspend}
_other ->
run_loop(module, agent, rest)
end
end
defp check_terminal(agent) do
strat = StratState.get(agent)
case strat.status do
:success -> {:ok, agent}
:failure -> {:error, Map.get(strat, :error_reason, :workflow_failed)}
_ -> {:ok, agent}
end
end
end
IO.puts("Setup complete.")Part 1: Deployment Approval Gate
A deployment pipeline that pauses for human approval before going to production.
The HumanNode emits a Suspend directive, and the workflow waits for a response.
stateDiagram-v2
[*] --> prepare
prepare --> approval : ok
approval --> deploy : approved
approval --> failed : rejected
deploy --> done : okDefine the Workflow
defmodule Demo.DeployWorkflow do
@moduledoc false
use Jido.Composer.Workflow,
name: "deploy_pipeline",
description: "Deployment pipeline with human approval gate",
nodes: %{
prepare: Demo.AccumulatorAction,
approval: %HumanNode{
name: "deploy_gate",
description: "Approve deployment to production",
prompt: "Deploy to production?",
allowed_responses: [:approved, :rejected],
timeout: 60_000,
timeout_outcome: :timeout
},
deploy: Demo.NoopAction
},
transitions: %{
{:prepare, :ok} => :approval,
{:approval, :approved} => :deploy,
{:approval, :rejected} => :failed,
{:approval, :timeout} => :failed,
{:deploy, :ok} => :done,
{:_, :error} => :failed
},
initial: :prepare,
terminal_states: [:done, :failed],
success_states: [:done]
require Demo.Helpers
Demo.Helpers.suppress_agent_doctests()
end
IO.puts("DeployWorkflow defined.")Run Until Suspension
agent = Demo.DeployWorkflow.new()
{agent, directives} = Demo.DeployWorkflow.run(agent, %{tag: "deploy-v2.1"})
{:suspend, suspended_agent, suspend_directive} =
Demo.DirectiveRunner.run_until_done(Demo.DeployWorkflow, agent, directives)
strat = StratState.get(suspended_agent)
suspension = strat.pending_suspension
IO.puts("=== Workflow Suspended ===")
IO.puts("State: #{strat.machine.status}")
IO.puts("Reason: #{suspension.reason}")
IO.puts("Prompt: #{suspension.approval_request.prompt}")
IO.puts("Allowed responses: #{inspect(suspension.approval_request.allowed_responses)}")
IO.puts("Request ID: #{suspension.approval_request.id}")
IO.puts("Timeout: #{suspension.timeout}ms")Resume with Approval
# Simulate a human approving the deployment
{resumed_agent, resume_directives} =
Demo.DeployWorkflow.cmd(suspended_agent, {
:suspend_resume,
%{
suspension_id: suspension.id,
response_data: %{
request_id: suspension.approval_request.id,
decision: :approved,
respondent: "alice@acme.com",
comment: "Looks good, ship it!"
}
}
})
case Demo.DirectiveRunner.run_until_done(Demo.DeployWorkflow, resumed_agent, resume_directives) do
{:ok, final_agent} ->
strat = StratState.get(final_agent)
ctx = Context.to_flat_map(strat.machine.context)
IO.puts("=== Deployment Complete ===")
IO.puts("Status: #{strat.status}")
IO.puts("Decision: #{inspect(ctx[:hitl_response][:decision])}")
IO.puts("Reviewer: #{inspect(ctx[:hitl_response][:respondent])}")
IO.puts("Comment: #{inspect(ctx[:hitl_response][:comment])}")
{:error, reason} ->
IO.puts("Failed: #{inspect(reason)}")
endPart 2: Generalized Suspension
Suspension isn't just for humans. A workflow can pause for any reason — rate limits, external jobs, async operations, or custom application logic.
# All five built-in suspension reasons
reasons = [
{:human_input, [metadata: %{urgency: :high}]},
{:rate_limit, [metadata: %{retry_after_ms: 5000, tokens_remaining: 0}]},
{:async_completion, [metadata: %{task_id: "async-42"}, resume_signal: "task.complete"]},
{:external_job, [metadata: %{job_id: "ci-build-99"}, timeout: 300_000]},
{:custom, [metadata: %{type: "budget_exceeded", amount: 150.0, limit: 100.0}]}
]
IO.puts("=== Suspension Reasons ===\n")
for {reason, opts} <- reasons do
{:ok, suspension} = Suspension.new([reason: reason] ++ opts)
IO.puts("#{String.pad_trailing(to_string(reason), 20)} | timeout: #{inspect(suspension.timeout)}")
IO.puts("#{String.pad_trailing("", 20)} | metadata: #{inspect(suspension.metadata)}\n")
endRate-Limit Suspension in Practice
defmodule Demo.RateLimitAction do
use Jido.Action,
name: "rate_limit_action",
description: "Simulates rate-limited operation",
schema: [tokens: [type: :integer, required: false]]
def run(params, _ctx) do
tokens = Map.get(params, :tokens, 0)
if tokens > 0 do
{:ok, %{processed: true, tokens_remaining: tokens - 1}}
else
{:ok, suspension} =
Suspension.new(
reason: :rate_limit,
metadata: %{retry_after_ms: 5000}
)
{:ok, %{processed: false, __suspension__: suspension}, :suspend}
end
end
end
defmodule Demo.RateLimitWorkflow do
@moduledoc false
use Jido.Composer.Workflow,
name: "rate_limit_demo",
description: "Workflow that suspends on rate limit",
nodes: %{
prepare: Demo.AccumulatorAction,
api_call: Demo.RateLimitAction,
finish: Demo.NoopAction
},
transitions: %{
{:prepare, :ok} => :api_call,
{:api_call, :ok} => :finish,
{:api_call, :timeout} => :failed,
{:finish, :ok} => :done,
{:_, :error} => :failed
},
initial: :prepare,
terminal_states: [:done, :failed],
success_states: [:done]
require Demo.Helpers
Demo.Helpers.suppress_agent_doctests()
end
# Run with tokens=0 to trigger rate limit
agent = Demo.RateLimitWorkflow.new()
{agent, directives} = Demo.RateLimitWorkflow.run(agent, %{tag: "rate_test", tokens: 0})
case Demo.DirectiveRunner.run_until_done(Demo.RateLimitWorkflow, agent, directives) do
{:suspend, suspended_agent, _suspend_directive} ->
strat = StratState.get(suspended_agent)
suspension = strat.pending_suspension
IO.puts("=== Rate Limit Suspension ===")
IO.puts("Reason: #{suspension.reason}")
IO.puts("Metadata: #{inspect(suspension.metadata)}")
# Resume with tokens available
IO.puts("\n--- Resuming after cooldown ---")
{resumed, resume_directives} =
Demo.RateLimitWorkflow.cmd(suspended_agent, {
:suspend_resume,
%{suspension_id: suspension.id, outcome: :ok, data: %{tokens: 10}}
})
case Demo.DirectiveRunner.run_until_done(Demo.RateLimitWorkflow, resumed, resume_directives) do
{:ok, final_agent} ->
strat = StratState.get(final_agent)
ctx = Context.to_flat_map(strat.machine.context)
IO.puts("Workflow completed after resume!")
IO.puts("API call result: #{inspect(ctx[:api_call])}")
other ->
IO.puts("Unexpected: #{inspect(other)}")
end
{:ok, _} ->
IO.puts("Completed without suspension (unexpected)")
endPart 3: Checkpoint and Restore
When a workflow suspends, its entire state can be checkpointed (serialized to binary) and stored. Later, a fresh process can thaw the checkpoint and resume exactly where it left off. This is essential for long-running workflows that survive process restarts.
# Step 1: Run until suspension
agent = Demo.DeployWorkflow.new()
{agent, directives} = Demo.DeployWorkflow.run(agent, %{tag: "checkpoint_demo"})
{:suspend, suspended_agent, _directive} =
Demo.DirectiveRunner.run_until_done(Demo.DeployWorkflow, agent, directives)
IO.puts("=== Checkpoint/Thaw Lifecycle ===\n")
IO.puts("Step 1: Workflow suspended for human approval")
# Step 2: Checkpoint — strip closures for serialization
strat = StratState.get(suspended_agent)
checkpoint_data = Checkpoint.prepare_for_checkpoint(strat)
IO.puts("Step 2: Checkpoint prepared (schema v#{Checkpoint.schema_version()})")
# Step 3: Serialize to binary
binary = :erlang.term_to_binary(checkpoint_data, [:compressed])
IO.puts("Step 3: Serialized to #{byte_size(binary)} bytes")
# Step 4: Simulate process death + restore
IO.puts("Step 4: Process died. Creating fresh agent...")
fresh_agent = Demo.DeployWorkflow.new()
# Step 5: Thaw — deserialize and reattach closures
restored_strat = :erlang.binary_to_term(binary)
restored_strat = Checkpoint.reattach_runtime_config(restored_strat, [])
restored_agent = StratState.put(fresh_agent, restored_strat)
IO.puts("Step 5: Agent thawed from checkpoint")
IO.puts(" Status: #{restored_strat.status}")
IO.puts(" Pending: #{restored_strat.pending_suspension.reason}")
# Step 6: Resume
suspension_id = restored_strat.pending_suspension.id
{:ok, resumed_agent, resume_directives} =
Resume.resume(
restored_agent,
suspension_id,
%{
request_id: restored_strat.pending_suspension.approval_request.id,
decision: :approved,
respondent: "bob@acme.com"
},
deliver_fn: fn agent, signal ->
Demo.DeployWorkflow.cmd(agent, signal)
end
)
case Demo.DirectiveRunner.run_until_done(Demo.DeployWorkflow, resumed_agent, resume_directives) do
{:ok, final} ->
strat = StratState.get(final)
IO.puts("Step 6: Workflow completed! Status: #{strat.status}")
IO.puts("\nFull lifecycle:")
IO.puts(" Run -> Suspend -> Checkpoint -> Serialize -> Death -> Thaw -> Resume -> Complete")
other ->
IO.puts("Unexpected: #{inspect(other)}")
endNext Steps
You've learned how to:
- Suspend workflows for human approval
- Use generalized suspension for rate limits and other reasons
- Checkpoint suspended state and restore it across process restarts
Next guide: LLM Orchestrator — let an LLM decide which tools to call at runtime.