Mix.install(
[
{:jido_composer, ">= 0.0.0"},
{:kino, "~> 0.14"}
],
config: [
jido_action: [default_timeout: :timer.minutes(5)]
]
)Introduction
This guide builds a document review pipeline that combines every Jido Composer capability into a single realistic scenario:
- Workflow FSM orchestrating the overall process
- FanOut for parallel analysis branches
- AgentNode nesting a workflow agent inside a FanOut branch
- HumanNode for reviewer approval
- Checkpoint/Thaw for surviving process restarts
The pipeline:
- Prepare — Gather documents for review
- Parallel Analysis — Extract, analyze (via nested agent), and validate concurrently
- Review Gate — Human reviewer approves or rejects
- Publish — Publish approved documents
graph TD
subgraph Outer Workflow
A[Prepare] --> B[FanOut: Parallel Analysis]
B --> C[Review Gate - HumanNode]
C -->|approved| D[Publish]
C -->|rejected| E[Failed]
D --> F[Done]
end
subgraph "FanOut Branches (parallel)"
B --> B1[Branch: Extract - ActionNode]
B --> B2[Branch: Analyze - Inner Workflow Agent]
B --> B3[Branch: Validate - ActionNode]
end
subgraph "Inner Analyze Agent (Workflow)"
B2 --> G[Transform]
G --> H[Load]
endSetup
api_key =
System.get_env("ANTHROPIC_API_KEY") || System.get_env("LB_ANTHROPIC_API_KEY") ||
raise "Set ANTHROPIC_API_KEY in your environment or Livebook app settings."
Application.put_env(:req_llm, :anthropic_api_key, api_key)
alias Jido.Agent.Directive
alias Jido.Agent.Strategy.State, as: StratState
alias Jido.Composer.{Context, Suspension, Checkpoint, Resume}
alias Jido.Composer.Directive.{FanOutBranch, Suspend}
alias Jido.Composer.HITL.{ApprovalRequest, ApprovalResponse}
alias Jido.Composer.Node.{ActionNode, AgentNode, FanOutNode, HumanNode}
defmodule Demo.Helpers do
defmacro suppress_agent_doctests do
quote do
@doc false
def plugins, do: super()
@doc false
def capabilities, do: super()
@doc false
def signal_types, do: super()
end
end
end
# DirectiveRunner: executes directives synchronously for demo purposes.
defmodule Demo.DirectiveRunner do
@moduledoc false
def run_until_done(module, agent, directives) do
run_loop(module, agent, directives)
end
defp run_loop(_module, agent, []), do: check_terminal(agent)
defp run_loop(module, agent, [directive | rest]) do
case directive do
%Directive.RunInstruction{instruction: instr, result_action: result_action} ->
payload = execute_sync(instr)
{agent, new_directives} = module.cmd(agent, {result_action, payload})
run_loop(module, agent, new_directives ++ rest)
%Directive.SpawnAgent{agent: child_module, tag: tag, opts: spawn_opts} ->
payload = Jido.Composer.Node.execute_child_sync(child_module, spawn_opts)
{agent, new_directives} = module.cmd(agent, {:workflow_child_result, %{tag: tag, result: payload}})
run_loop(module, agent, new_directives ++ rest)
%FanOutBranch{} = _first ->
{fan_out_directives, remaining} =
Enum.split_with([directive | rest], &match?(%FanOutBranch{}, &1))
results = execute_fan_out_branches(fan_out_directives)
{agent, all_directives} =
Enum.reduce(results, {agent, []}, fn {name, result}, {acc, acc_dirs} ->
{new_agent, new_dirs} = module.cmd(acc, {:fan_out_branch_result, %{branch_name: name, result: result}})
{new_agent, acc_dirs ++ new_dirs}
end)
run_loop(module, agent, all_directives ++ remaining)
%Suspend{} = suspend ->
{:suspend, agent, suspend}
_other ->
run_loop(module, agent, rest)
end
end
defp execute_sync(%Jido.Instruction{action: action_module, params: params}) do
case Jido.Exec.run(action_module, params, %{}, timeout: 0) do
{:ok, result} -> %{status: :ok, result: result}
{:ok, result, outcome} -> %{status: :ok, result: result, outcome: outcome}
{:error, reason} -> %{status: :error, result: %{error: reason}}
end
end
defp execute_fan_out_branches(directives) do
directives
|> Task.async_stream(
fn %FanOutBranch{} = branch ->
result = execute_branch(branch)
{branch.branch_name, result}
end,
timeout: 30_000, on_timeout: :kill_task, ordered: true
)
|> Enum.zip(directives)
|> Enum.map(fn
{{:ok, {name, result}}, _} -> {name, result}
{{:exit, reason}, branch} -> {branch.branch_name, {:error, {:crashed, reason}}}
end)
end
defp execute_branch(%FanOutBranch{instruction: %Jido.Instruction{} = instr}) do
case execute_sync(instr) do
%{status: :ok, result: result} -> {:ok, result}
%{status: :error, result: %{error: reason}} -> {:error, reason}
end
end
defp execute_branch(%FanOutBranch{spawn_agent: spawn_info}) do
Jido.Composer.Node.execute_child_sync(spawn_info.agent, spawn_info.opts)
end
defp check_terminal(agent) do
strat = StratState.get(agent)
case strat.status do
:success -> {:ok, agent}
:failure -> {:error, Map.get(strat, :error_reason, :workflow_failed)}
_ -> {:ok, agent}
end
end
end
IO.puts("Setup complete.")Define Domain Actions
defmodule Demo.PrepareDocsAction do
use Jido.Action,
name: "prepare_docs",
description: "Gathers documents for review",
schema: [
doc_ids: [type: {:list, :string}, required: false],
source: [type: :string, required: false]
]
def run(params, _ctx) do
doc_ids = Map.get(params, :doc_ids, ["DOC-001", "DOC-002", "DOC-003"])
source = Map.get(params, :source, "default")
{:ok, %{
documents: Enum.map(doc_ids, &%{id: &1, source: source, status: :pending}),
count: length(doc_ids),
gathered_at: DateTime.utc_now() |> DateTime.to_iso8601()
}}
end
end
defmodule Demo.TransformAction do
use Jido.Action,
name: "transform",
description: "Transforms records by uppercasing source",
schema: [extract: [type: :map, required: false]]
def run(params, _ctx) do
records = get_in(params, [:extract, :records]) || []
transformed = Enum.map(records, fn rec -> Map.update(rec, :source, "", &String.upcase/1) end)
{:ok, %{records: transformed, count: length(transformed)}}
end
end
defmodule Demo.LoadAction do
use Jido.Action,
name: "load",
description: "Loads records into storage",
schema: [transform: [type: :map, required: false]]
def run(params, _ctx) do
records = get_in(params, [:transform, :records]) || []
{:ok, %{loaded: length(records), status: :complete}}
end
end
defmodule Demo.ValidateDocsAction do
use Jido.Action,
name: "validate_docs",
description: "Validates document format and compliance",
schema: []
def run(params, _ctx) do
docs =
get_in(params, [:parallel, :extract, :documents]) ||
get_in(params, [:extract, :documents]) || []
results =
Enum.map(docs, fn doc ->
%{id: doc[:id], valid: true, compliance_score: Enum.random(85..100)}
end)
{:ok, %{validations: results, all_valid: Enum.all?(results, & &1.valid)}}
end
end
defmodule Demo.PublishAction do
use Jido.Action,
name: "publish",
description: "Publishes reviewed documents",
schema: []
def run(params, _ctx) do
doc_count = get_in(params, [:parallel, :extract, :count]) || 0
{:ok, %{
published: doc_count,
published_at: DateTime.utc_now() |> DateTime.to_iso8601(),
reviewer: get_in(params, [:hitl_response, :respondent]) || "unknown"
}}
end
end
defmodule Demo.NoopAction do
use Jido.Action,
name: "noop",
description: "Does nothing",
schema: []
def run(_params, _ctx), do: {:ok, %{}}
end
IO.puts("Domain actions defined.")Define the Nested Agent
The inner AnalysisAgent is a Workflow that runs inside a FanOut branch. It transforms and loads data as a self-contained sub-pipeline.
defmodule Demo.AnalysisAgent do
@moduledoc false
use Jido.Composer.Workflow,
name: "analysis_agent",
description: "Runs transform+load as a nested agent inside FanOut",
nodes: %{
transform: Demo.TransformAction,
load: Demo.LoadAction
},
transitions: %{
{:transform, :ok} => :load,
{:load, :ok} => :done,
{:_, :error} => :failed
},
initial: :transform,
terminal_states: [:done, :failed],
success_states: [:done]
require Demo.Helpers
Demo.Helpers.suppress_agent_doctests()
end
IO.puts("AnalysisAgent defined.")Define the Outer Pipeline
The pipeline uses placeholder nodes in the DSL (because FanOutNode and HumanNode contain
closures/structs that can't be Macro.escape'd in Livebook). We patch them at runtime.
defmodule Demo.DocumentReviewPipeline do
@moduledoc false
use Jido.Composer.Workflow,
name: "document_review",
description: "Multi-level pipeline with FanOut, nested agents, HITL, and checkpoint",
nodes: %{
prepare: Demo.PrepareDocsAction,
parallel: Demo.PrepareDocsAction,
review: Demo.NoopAction,
publish: Demo.PublishAction
},
transitions: %{
{:prepare, :ok} => :parallel,
{:parallel, :ok} => :review,
{:review, :approved} => :publish,
{:review, :rejected} => :failed,
{:review, :timeout} => :failed,
{:publish, :ok} => :done,
{:_, :error} => :failed
},
initial: :prepare,
terminal_states: [:done, :failed],
success_states: [:done],
ambient: [:org_id, :project_id]
require Demo.Helpers
Demo.Helpers.suppress_agent_doctests()
end
# Helper to create a patched agent with runtime nodes (FanOut + HumanNode)
defmodule Demo.PipelineHelper do
def new_patched do
{:ok, extract_node} = ActionNode.new(Demo.PrepareDocsAction)
{:ok, analysis_node} = AgentNode.new(Demo.AnalysisAgent)
{:ok, validate_node} = ActionNode.new(Demo.ValidateDocsAction)
{:ok, parallel_analysis} =
FanOutNode.new(
name: "parallel_analysis",
branches: [
extract: extract_node,
analyze: analysis_node,
validate: validate_node
],
max_concurrency: 2,
on_error: :collect_partial
)
review_node = %HumanNode{
name: "review_gate",
description: "Human reviewer approves or rejects the analysis",
prompt: "Documents analyzed. Approve for publication?",
allowed_responses: [:approved, :rejected],
context_keys: [:prepare, :parallel],
timeout: 120_000,
timeout_outcome: :timeout
}
agent = Demo.DocumentReviewPipeline.new()
StratState.update(agent, fn strat ->
nodes = %{strat.machine.nodes | parallel: parallel_analysis, review: review_node}
%{strat | machine: %{strat.machine | nodes: nodes}}
end)
end
end
IO.puts("Pipeline defined. Ready to run.")Phase 1: Run Until Human Review
IO.puts(String.duplicate("=", 60))
IO.puts(" DOCUMENT REVIEW PIPELINE")
IO.puts(String.duplicate("=", 60))
agent = Demo.PipelineHelper.new_patched()
{agent, directives} =
Demo.DocumentReviewPipeline.run(agent, %{
source: "legal_repo",
doc_ids: ["CONTRACT-A", "CONTRACT-B", "AMENDMENT-1"],
org_id: "acme-legal",
project_id: "proj-42"
})
IO.puts("\nPhase 1: Running pipeline...")
{:suspend, suspended_agent, _suspend_directive} =
Demo.DirectiveRunner.run_until_done(Demo.DocumentReviewPipeline, agent, directives)
strat = StratState.get(suspended_agent)
suspension = strat.pending_suspension
ctx = Context.to_flat_map(strat.machine.context)
IO.puts("Pipeline suspended at: #{strat.machine.status}")
IO.puts("Suspension reason: #{suspension.reason}")
IO.puts("Approval prompt: #{suspension.approval_request.prompt}")
IO.puts("\nResults so far:")
IO.puts(" Prepare: #{ctx[:prepare][:count]} documents gathered")
IO.puts(" Extract branch: #{inspect(ctx[:parallel][:extract][:count])} docs extracted")
IO.puts(" Analyze branch: #{inspect(ctx[:parallel][:analyze][:load][:loaded])} docs analyzed")
IO.puts(" Validate branch: all valid? #{inspect(ctx[:parallel][:validate][:all_valid])}")
IO.puts(" Ambient org_id: #{inspect(ctx[Jido.Composer.Context.ambient_key()][:org_id])}")Phase 2: Checkpoint the Suspended Pipeline
IO.puts("\nPhase 2: Checkpointing suspended pipeline...")
checkpoint_data = Checkpoint.prepare_for_checkpoint(strat)
binary = :erlang.term_to_binary(checkpoint_data, [:compressed])
IO.puts(" Checkpoint size: #{byte_size(binary)} bytes")
IO.puts(" Schema version: #{Checkpoint.schema_version()}")
IO.puts(" Suspension preserved: #{checkpoint_data.pending_suspension.id}")Phase 3: Thaw from Checkpoint
IO.puts("\nPhase 3: Process restarted. Thawing from checkpoint...")
fresh_agent = Demo.PipelineHelper.new_patched()
restored = :erlang.binary_to_term(binary)
restored = Checkpoint.reattach_runtime_config(restored, [])
restored_agent = StratState.put(fresh_agent, restored)
IO.puts(" Agent thawed. Status: #{restored.status}")
IO.puts(" Machine state: #{restored.machine.status}")
IO.puts(" Pending suspension ID: #{restored.pending_suspension.id}")Phase 4: Resume with Human Approval
IO.puts("\nPhase 4: Human reviewer approves...")
suspension_id = restored.pending_suspension.id
{:ok, resumed_agent, resume_directives} =
Resume.resume(
restored_agent,
suspension_id,
%{
request_id: restored.pending_suspension.approval_request.id,
decision: :approved,
respondent: "legal-reviewer@acme.com",
comment: "All contracts verified. Approved for publication."
},
deliver_fn: fn agent, signal ->
Demo.DocumentReviewPipeline.cmd(agent, signal)
end
)
IO.puts(" Resume signal delivered. Running remaining steps...")
case Demo.DirectiveRunner.run_until_done(Demo.DocumentReviewPipeline, resumed_agent, resume_directives) do
{:ok, final_agent} ->
final_strat = StratState.get(final_agent)
final_ctx = Context.to_flat_map(final_strat.machine.context)
IO.puts("\n" <> String.duplicate("=", 60))
IO.puts(" PIPELINE COMPLETE")
IO.puts(String.duplicate("=", 60))
IO.puts(" Status: #{final_strat.status}")
IO.puts(" Documents published: #{final_ctx[:publish][:published]}")
IO.puts(" Published at: #{final_ctx[:publish][:published_at]}")
IO.puts(" Reviewer: #{final_ctx[:publish][:reviewer]}")
IO.puts(" Decision: #{inspect(final_ctx[:hitl_response][:decision])}")
IO.puts(" Comment: #{final_ctx[:hitl_response][:comment]}")
IO.puts(" Ambient org_id: #{final_ctx[Jido.Composer.Context.ambient_key()][:org_id]}")
IO.puts("\nLifecycle:")
IO.puts(" Define -> Run -> FanOut(3 branches, 1 nested agent) -> Suspend(HITL)")
IO.puts(" -> Checkpoint -> Serialize -> Thaw -> Resume -> Publish -> Done")
other ->
IO.puts("Unexpected: #{inspect(other)}")
endSummary
| Capability | What You Saw |
|---|---|
| Workflow FSM | Outer pipeline with deterministic state transitions |
| FanOut | 3 parallel branches (action + agent + action) with backpressure |
| AgentNode | Inner AnalysisAgent ran as a native node inside FanOut |
| HumanNode | Review gate suspended the pipeline for human approval |
| Checkpoint/Thaw | Serialized suspended state, restored in a fresh process |
| Context Layers | Ambient data (org_id) flowed through the entire pipeline |
This is the full composition model: deterministic workflows orchestrate parallel branches containing nested agents, with human oversight gates and durable state that survives process boundaries.