Mix.install(
  [
    {:jido_composer, ">= 0.0.0"},
    {:kino, "~> 0.14"}
  ],
  config: [
    jido_action: [default_timeout: :timer.minutes(5)]
  ]
)

Introduction

Real-world pipelines often need human oversight. Jido Composer supports this through generalized suspension — a workflow can pause for any reason (human approval, rate limits, external jobs) and resume later.

This guide covers:

  1. HumanNode — A node that suspends the workflow for human input
  2. Suspend/Resume — The directive-based suspension lifecycle
  3. Checkpoint/Thaw — Serializing suspended state across process restarts

Shared Setup

alias Jido.Agent.Directive
alias Jido.Agent.Strategy.State, as: StratState
alias Jido.Composer.{Context, Suspension, Checkpoint, Resume}
alias Jido.Composer.Directive.Suspend, as: SuspendDirective
alias Jido.Composer.HITL.{ApprovalRequest, ApprovalResponse}
alias Jido.Composer.Node.HumanNode

defmodule Demo.AccumulatorAction do
  use Jido.Action,
    name: "accumulator",
    description: "Stores a tag value",
    schema: [tag: [type: :string, required: true, doc: "Tag to append"]]

  def run(%{tag: tag}, _ctx), do: {:ok, %{tag: tag}}
end

defmodule Demo.NoopAction do
  use Jido.Action,
    name: "noop",
    description: "Does nothing",
    schema: []

  def run(_params, _ctx), do: {:ok, %{}}
end

defmodule Demo.Helpers do
  defmacro suppress_agent_doctests do
    quote do
      @doc false
      def plugins, do: super()
      @doc false
      def capabilities, do: super()
      @doc false
      def signal_types, do: super()
    end
  end
end

# DirectiveRunner: executes directives synchronously for demo purposes.
defmodule Demo.DirectiveRunner do
  def run_until_done(module, agent, directives) do
    run_loop(module, agent, directives)
  end

  defp run_loop(_module, agent, []), do: check_terminal(agent)

  defp run_loop(module, agent, [directive | rest]) do
    case directive do
      %Directive.RunInstruction{instruction: instr, result_action: result_action} ->
        payload =
          case Jido.Exec.run(instr.action, instr.params, %{}, timeout: 0) do
            {:ok, result} -> %{status: :ok, result: result}
            {:ok, result, outcome} -> %{status: :ok, result: result, outcome: outcome}
            {:error, reason} -> %{status: :error, result: %{error: reason}}
          end

        {agent, new_directives} = module.cmd(agent, {result_action, payload})
        run_loop(module, agent, new_directives ++ rest)

      %SuspendDirective{} = suspend ->
        {:suspend, agent, suspend}

      _other ->
        run_loop(module, agent, rest)
    end
  end

  defp check_terminal(agent) do
    strat = StratState.get(agent)

    case strat.status do
      :success -> {:ok, agent}
      :failure -> {:error, Map.get(strat, :error_reason, :workflow_failed)}
      _ -> {:ok, agent}
    end
  end
end

IO.puts("Setup complete.")

Part 1: Deployment Approval Gate

A deployment pipeline that pauses for human approval before going to production. The HumanNode emits a Suspend directive, and the workflow waits for a response.

stateDiagram-v2
    [*] --> prepare
    prepare --> approval : ok
    approval --> deploy : approved
    approval --> failed : rejected
    deploy --> done : ok

Define the Workflow

defmodule Demo.DeployWorkflow do
  @moduledoc false
  use Jido.Composer.Workflow,
    name: "deploy_pipeline",
    description: "Deployment pipeline with human approval gate",
    nodes: %{
      prepare: Demo.AccumulatorAction,
      approval: %HumanNode{
        name: "deploy_gate",
        description: "Approve deployment to production",
        prompt: "Deploy to production?",
        allowed_responses: [:approved, :rejected],
        timeout: 60_000,
        timeout_outcome: :timeout
      },
      deploy: Demo.NoopAction
    },
    transitions: %{
      {:prepare, :ok} => :approval,
      {:approval, :approved} => :deploy,
      {:approval, :rejected} => :failed,
      {:approval, :timeout} => :failed,
      {:deploy, :ok} => :done,
      {:_, :error} => :failed
    },
    initial: :prepare,
    terminal_states: [:done, :failed],
    success_states: [:done]

  require Demo.Helpers
  Demo.Helpers.suppress_agent_doctests()
end

IO.puts("DeployWorkflow defined.")

Run Until Suspension

agent = Demo.DeployWorkflow.new()
{agent, directives} = Demo.DeployWorkflow.run(agent, %{tag: "deploy-v2.1"})

{:suspend, suspended_agent, suspend_directive} =
  Demo.DirectiveRunner.run_until_done(Demo.DeployWorkflow, agent, directives)

strat = StratState.get(suspended_agent)
suspension = strat.pending_suspension

IO.puts("=== Workflow Suspended ===")
IO.puts("State:              #{strat.machine.status}")
IO.puts("Reason:             #{suspension.reason}")
IO.puts("Prompt:             #{suspension.approval_request.prompt}")
IO.puts("Allowed responses:  #{inspect(suspension.approval_request.allowed_responses)}")
IO.puts("Request ID:         #{suspension.approval_request.id}")
IO.puts("Timeout:            #{suspension.timeout}ms")

Resume with Approval

# Simulate a human approving the deployment
{resumed_agent, resume_directives} =
  Demo.DeployWorkflow.cmd(suspended_agent, {
    :suspend_resume,
    %{
      suspension_id: suspension.id,
      response_data: %{
        request_id: suspension.approval_request.id,
        decision: :approved,
        respondent: "alice@acme.com",
        comment: "Looks good, ship it!"
      }
    }
  })

case Demo.DirectiveRunner.run_until_done(Demo.DeployWorkflow, resumed_agent, resume_directives) do
  {:ok, final_agent} ->
    strat = StratState.get(final_agent)
    ctx = Context.to_flat_map(strat.machine.context)

    IO.puts("=== Deployment Complete ===")
    IO.puts("Status:    #{strat.status}")
    IO.puts("Decision:  #{inspect(ctx[:hitl_response][:decision])}")
    IO.puts("Reviewer:  #{inspect(ctx[:hitl_response][:respondent])}")
    IO.puts("Comment:   #{inspect(ctx[:hitl_response][:comment])}")

  {:error, reason} ->
    IO.puts("Failed: #{inspect(reason)}")
end

Part 2: Generalized Suspension

Suspension isn't just for humans. A workflow can pause for any reason — rate limits, external jobs, async operations, or custom application logic.

# All five built-in suspension reasons
reasons = [
  {:human_input, [metadata: %{urgency: :high}]},
  {:rate_limit, [metadata: %{retry_after_ms: 5000, tokens_remaining: 0}]},
  {:async_completion, [metadata: %{task_id: "async-42"}, resume_signal: "task.complete"]},
  {:external_job, [metadata: %{job_id: "ci-build-99"}, timeout: 300_000]},
  {:custom, [metadata: %{type: "budget_exceeded", amount: 150.0, limit: 100.0}]}
]

IO.puts("=== Suspension Reasons ===\n")

for {reason, opts} <- reasons do
  {:ok, suspension} = Suspension.new([reason: reason] ++ opts)
  IO.puts("#{String.pad_trailing(to_string(reason), 20)} | timeout: #{inspect(suspension.timeout)}")
  IO.puts("#{String.pad_trailing("", 20)} | metadata: #{inspect(suspension.metadata)}\n")
end

Rate-Limit Suspension in Practice

defmodule Demo.RateLimitAction do
  use Jido.Action,
    name: "rate_limit_action",
    description: "Simulates rate-limited operation",
    schema: [tokens: [type: :integer, required: false]]

  def run(params, _ctx) do
    tokens = Map.get(params, :tokens, 0)

    if tokens > 0 do
      {:ok, %{processed: true, tokens_remaining: tokens - 1}}
    else
      {:ok, suspension} =
        Suspension.new(
          reason: :rate_limit,
          metadata: %{retry_after_ms: 5000}
        )

      {:ok, %{processed: false, __suspension__: suspension}, :suspend}
    end
  end
end

defmodule Demo.RateLimitWorkflow do
  @moduledoc false
  use Jido.Composer.Workflow,
    name: "rate_limit_demo",
    description: "Workflow that suspends on rate limit",
    nodes: %{
      prepare: Demo.AccumulatorAction,
      api_call: Demo.RateLimitAction,
      finish: Demo.NoopAction
    },
    transitions: %{
      {:prepare, :ok} => :api_call,
      {:api_call, :ok} => :finish,
      {:api_call, :timeout} => :failed,
      {:finish, :ok} => :done,
      {:_, :error} => :failed
    },
    initial: :prepare,
    terminal_states: [:done, :failed],
    success_states: [:done]

  require Demo.Helpers
  Demo.Helpers.suppress_agent_doctests()
end

# Run with tokens=0 to trigger rate limit
agent = Demo.RateLimitWorkflow.new()
{agent, directives} = Demo.RateLimitWorkflow.run(agent, %{tag: "rate_test", tokens: 0})

case Demo.DirectiveRunner.run_until_done(Demo.RateLimitWorkflow, agent, directives) do
  {:suspend, suspended_agent, _suspend_directive} ->
    strat = StratState.get(suspended_agent)
    suspension = strat.pending_suspension

    IO.puts("=== Rate Limit Suspension ===")
    IO.puts("Reason:   #{suspension.reason}")
    IO.puts("Metadata: #{inspect(suspension.metadata)}")

    # Resume with tokens available
    IO.puts("\n--- Resuming after cooldown ---")

    {resumed, resume_directives} =
      Demo.RateLimitWorkflow.cmd(suspended_agent, {
        :suspend_resume,
        %{suspension_id: suspension.id, outcome: :ok, data: %{tokens: 10}}
      })

    case Demo.DirectiveRunner.run_until_done(Demo.RateLimitWorkflow, resumed, resume_directives) do
      {:ok, final_agent} ->
        strat = StratState.get(final_agent)
        ctx = Context.to_flat_map(strat.machine.context)
        IO.puts("Workflow completed after resume!")
        IO.puts("API call result: #{inspect(ctx[:api_call])}")

      other ->
        IO.puts("Unexpected: #{inspect(other)}")
    end

  {:ok, _} ->
    IO.puts("Completed without suspension (unexpected)")
end

Part 3: Checkpoint and Restore

When a workflow suspends, its entire state can be checkpointed (serialized to binary) and stored. Later, a fresh process can thaw the checkpoint and resume exactly where it left off. This is essential for long-running workflows that survive process restarts.

# Step 1: Run until suspension
agent = Demo.DeployWorkflow.new()
{agent, directives} = Demo.DeployWorkflow.run(agent, %{tag: "checkpoint_demo"})

{:suspend, suspended_agent, _directive} =
  Demo.DirectiveRunner.run_until_done(Demo.DeployWorkflow, agent, directives)

IO.puts("=== Checkpoint/Thaw Lifecycle ===\n")
IO.puts("Step 1: Workflow suspended for human approval")

# Step 2: Checkpoint — strip closures for serialization
strat = StratState.get(suspended_agent)
checkpoint_data = Checkpoint.prepare_for_checkpoint(strat)

IO.puts("Step 2: Checkpoint prepared (schema v#{Checkpoint.schema_version()})")

# Step 3: Serialize to binary
binary = :erlang.term_to_binary(checkpoint_data, [:compressed])
IO.puts("Step 3: Serialized to #{byte_size(binary)} bytes")

# Step 4: Simulate process death + restore
IO.puts("Step 4: Process died. Creating fresh agent...")
fresh_agent = Demo.DeployWorkflow.new()

# Step 5: Thaw — deserialize and reattach closures
restored_strat = :erlang.binary_to_term(binary)
restored_strat = Checkpoint.reattach_runtime_config(restored_strat, [])
restored_agent = StratState.put(fresh_agent, restored_strat)

IO.puts("Step 5: Agent thawed from checkpoint")
IO.puts("  Status: #{restored_strat.status}")
IO.puts("  Pending: #{restored_strat.pending_suspension.reason}")

# Step 6: Resume
suspension_id = restored_strat.pending_suspension.id

{:ok, resumed_agent, resume_directives} =
  Resume.resume(
    restored_agent,
    suspension_id,
    %{
      request_id: restored_strat.pending_suspension.approval_request.id,
      decision: :approved,
      respondent: "bob@acme.com"
    },
    deliver_fn: fn agent, signal ->
      Demo.DeployWorkflow.cmd(agent, signal)
    end
  )

case Demo.DirectiveRunner.run_until_done(Demo.DeployWorkflow, resumed_agent, resume_directives) do
  {:ok, final} ->
    strat = StratState.get(final)
    IO.puts("Step 6: Workflow completed! Status: #{strat.status}")
    IO.puts("\nFull lifecycle:")
    IO.puts("  Run -> Suspend -> Checkpoint -> Serialize -> Death -> Thaw -> Resume -> Complete")

  other ->
    IO.puts("Unexpected: #{inspect(other)}")
end

Next Steps

You've learned how to:

  • Suspend workflows for human approval
  • Use generalized suspension for rate limits and other reasons
  • Checkpoint suspended state and restore it across process restarts

Next guide: LLM Orchestrator — let an LLM decide which tools to call at runtime.