Multi-Agent Pipeline

Copy Markdown View Source
Mix.install(
  [
    {:jido_composer, ">= 0.0.0"},
    {:kino, "~> 0.14"}
  ],
  config: [
    jido_action: [default_timeout: :timer.minutes(5)]
  ]
)

Introduction

This guide builds a document review pipeline that combines every Jido Composer capability into a single realistic scenario:

  • Workflow FSM orchestrating the overall process
  • FanOut for parallel analysis branches
  • AgentNode nesting a workflow agent inside a FanOut branch
  • HumanNode for reviewer approval
  • Checkpoint/Thaw for surviving process restarts

The pipeline:

  1. Prepare — Gather documents for review
  2. Parallel Analysis — Extract, analyze (via nested agent), and validate concurrently
  3. Review Gate — Human reviewer approves or rejects
  4. Publish — Publish approved documents
graph TD
    subgraph Outer Workflow
        A[Prepare] --> B[FanOut: Parallel Analysis]
        B --> C[Review Gate - HumanNode]
        C -->|approved| D[Publish]
        C -->|rejected| E[Failed]
        D --> F[Done]
    end

    subgraph "FanOut Branches (parallel)"
        B --> B1[Branch: Extract - ActionNode]
        B --> B2[Branch: Analyze - Inner Workflow Agent]
        B --> B3[Branch: Validate - ActionNode]
    end

    subgraph "Inner Analyze Agent (Workflow)"
        B2 --> G[Transform]
        G --> H[Load]
    end

Setup

api_key =
  System.get_env("ANTHROPIC_API_KEY") || System.get_env("LB_ANTHROPIC_API_KEY") ||
    raise "Set ANTHROPIC_API_KEY in your environment or Livebook app settings."

Application.put_env(:req_llm, :anthropic_api_key, api_key)

alias Jido.Agent.Directive
alias Jido.Agent.Strategy.State, as: StratState
alias Jido.Composer.{Context, Suspension, Checkpoint, Resume}
alias Jido.Composer.Directive.{FanOutBranch, Suspend}
alias Jido.Composer.HITL.{ApprovalRequest, ApprovalResponse}
alias Jido.Composer.Node.{ActionNode, AgentNode, FanOutNode, HumanNode}

defmodule Demo.Helpers do
  defmacro suppress_agent_doctests do
    quote do
      @doc false
      def plugins, do: super()
      @doc false
      def capabilities, do: super()
      @doc false
      def signal_types, do: super()
    end
  end
end

# DirectiveRunner: executes directives synchronously for demo purposes.
defmodule Demo.DirectiveRunner do
  @moduledoc false

  def run_until_done(module, agent, directives) do
    run_loop(module, agent, directives)
  end

  defp run_loop(_module, agent, []), do: check_terminal(agent)

  defp run_loop(module, agent, [directive | rest]) do
    case directive do
      %Directive.RunInstruction{instruction: instr, result_action: result_action} ->
        payload = execute_sync(instr)
        {agent, new_directives} = module.cmd(agent, {result_action, payload})
        run_loop(module, agent, new_directives ++ rest)

      %Directive.SpawnAgent{agent: child_module, tag: tag, opts: spawn_opts} ->
        payload = Jido.Composer.Node.execute_child_sync(child_module, spawn_opts)
        {agent, new_directives} = module.cmd(agent, {:workflow_child_result, %{tag: tag, result: payload}})
        run_loop(module, agent, new_directives ++ rest)

      %FanOutBranch{} = _first ->
        {fan_out_directives, remaining} =
          Enum.split_with([directive | rest], &match?(%FanOutBranch{}, &1))

        results = execute_fan_out_branches(fan_out_directives)

        {agent, all_directives} =
          Enum.reduce(results, {agent, []}, fn {name, result}, {acc, acc_dirs} ->
            {new_agent, new_dirs} = module.cmd(acc, {:fan_out_branch_result, %{branch_name: name, result: result}})
            {new_agent, acc_dirs ++ new_dirs}
          end)

        run_loop(module, agent, all_directives ++ remaining)

      %Suspend{} = suspend ->
        {:suspend, agent, suspend}

      _other ->
        run_loop(module, agent, rest)
    end
  end

  defp execute_sync(%Jido.Instruction{action: action_module, params: params}) do
    case Jido.Exec.run(action_module, params, %{}, timeout: 0) do
      {:ok, result} -> %{status: :ok, result: result}
      {:ok, result, outcome} -> %{status: :ok, result: result, outcome: outcome}
      {:error, reason} -> %{status: :error, result: %{error: reason}}
    end
  end

  defp execute_fan_out_branches(directives) do
    directives
    |> Task.async_stream(
      fn %FanOutBranch{} = branch ->
        result = execute_branch(branch)
        {branch.branch_name, result}
      end,
      timeout: 30_000, on_timeout: :kill_task, ordered: true
    )
    |> Enum.zip(directives)
    |> Enum.map(fn
      {{:ok, {name, result}}, _} -> {name, result}
      {{:exit, reason}, branch} -> {branch.branch_name, {:error, {:crashed, reason}}}
    end)
  end

  defp execute_branch(%FanOutBranch{instruction: %Jido.Instruction{} = instr}) do
    case execute_sync(instr) do
      %{status: :ok, result: result} -> {:ok, result}
      %{status: :error, result: %{error: reason}} -> {:error, reason}
    end
  end

  defp execute_branch(%FanOutBranch{spawn_agent: spawn_info}) do
    Jido.Composer.Node.execute_child_sync(spawn_info.agent, spawn_info.opts)
  end

  defp check_terminal(agent) do
    strat = StratState.get(agent)

    case strat.status do
      :success -> {:ok, agent}
      :failure -> {:error, Map.get(strat, :error_reason, :workflow_failed)}
      _ -> {:ok, agent}
    end
  end
end

IO.puts("Setup complete.")

Define Domain Actions

defmodule Demo.PrepareDocsAction do
  use Jido.Action,
    name: "prepare_docs",
    description: "Gathers documents for review",
    schema: [
      doc_ids: [type: {:list, :string}, required: false],
      source: [type: :string, required: false]
    ]

  def run(params, _ctx) do
    doc_ids = Map.get(params, :doc_ids, ["DOC-001", "DOC-002", "DOC-003"])
    source = Map.get(params, :source, "default")

    {:ok, %{
      documents: Enum.map(doc_ids, &%{id: &1, source: source, status: :pending}),
      count: length(doc_ids),
      gathered_at: DateTime.utc_now() |> DateTime.to_iso8601()
    }}
  end
end

defmodule Demo.TransformAction do
  use Jido.Action,
    name: "transform",
    description: "Transforms records by uppercasing source",
    schema: [extract: [type: :map, required: false]]

  def run(params, _ctx) do
    records = get_in(params, [:extract, :records]) || []
    transformed = Enum.map(records, fn rec -> Map.update(rec, :source, "", &String.upcase/1) end)
    {:ok, %{records: transformed, count: length(transformed)}}
  end
end

defmodule Demo.LoadAction do
  use Jido.Action,
    name: "load",
    description: "Loads records into storage",
    schema: [transform: [type: :map, required: false]]

  def run(params, _ctx) do
    records = get_in(params, [:transform, :records]) || []
    {:ok, %{loaded: length(records), status: :complete}}
  end
end

defmodule Demo.ValidateDocsAction do
  use Jido.Action,
    name: "validate_docs",
    description: "Validates document format and compliance",
    schema: []

  def run(params, _ctx) do
    docs =
      get_in(params, [:parallel, :extract, :documents]) ||
        get_in(params, [:extract, :documents]) || []

    results =
      Enum.map(docs, fn doc ->
        %{id: doc[:id], valid: true, compliance_score: Enum.random(85..100)}
      end)

    {:ok, %{validations: results, all_valid: Enum.all?(results, & &1.valid)}}
  end
end

defmodule Demo.PublishAction do
  use Jido.Action,
    name: "publish",
    description: "Publishes reviewed documents",
    schema: []

  def run(params, _ctx) do
    doc_count = get_in(params, [:parallel, :extract, :count]) || 0

    {:ok, %{
      published: doc_count,
      published_at: DateTime.utc_now() |> DateTime.to_iso8601(),
      reviewer: get_in(params, [:hitl_response, :respondent]) || "unknown"
    }}
  end
end

defmodule Demo.NoopAction do
  use Jido.Action,
    name: "noop",
    description: "Does nothing",
    schema: []

  def run(_params, _ctx), do: {:ok, %{}}
end

IO.puts("Domain actions defined.")

Define the Nested Agent

The inner AnalysisAgent is a Workflow that runs inside a FanOut branch. It transforms and loads data as a self-contained sub-pipeline.

defmodule Demo.AnalysisAgent do
  @moduledoc false
  use Jido.Composer.Workflow,
    name: "analysis_agent",
    description: "Runs transform+load as a nested agent inside FanOut",
    nodes: %{
      transform: Demo.TransformAction,
      load: Demo.LoadAction
    },
    transitions: %{
      {:transform, :ok} => :load,
      {:load, :ok} => :done,
      {:_, :error} => :failed
    },
    initial: :transform,
    terminal_states: [:done, :failed],
    success_states: [:done]

  require Demo.Helpers
  Demo.Helpers.suppress_agent_doctests()
end

IO.puts("AnalysisAgent defined.")

Define the Outer Pipeline

The pipeline uses placeholder nodes in the DSL (because FanOutNode and HumanNode contain closures/structs that can't be Macro.escape'd in Livebook). We patch them at runtime.

defmodule Demo.DocumentReviewPipeline do
  @moduledoc false
  use Jido.Composer.Workflow,
    name: "document_review",
    description: "Multi-level pipeline with FanOut, nested agents, HITL, and checkpoint",
    nodes: %{
      prepare: Demo.PrepareDocsAction,
      parallel: Demo.PrepareDocsAction,
      review: Demo.NoopAction,
      publish: Demo.PublishAction
    },
    transitions: %{
      {:prepare, :ok} => :parallel,
      {:parallel, :ok} => :review,
      {:review, :approved} => :publish,
      {:review, :rejected} => :failed,
      {:review, :timeout} => :failed,
      {:publish, :ok} => :done,
      {:_, :error} => :failed
    },
    initial: :prepare,
    terminal_states: [:done, :failed],
    success_states: [:done],
    ambient: [:org_id, :project_id]

  require Demo.Helpers
  Demo.Helpers.suppress_agent_doctests()
end

# Helper to create a patched agent with runtime nodes (FanOut + HumanNode)
defmodule Demo.PipelineHelper do
  def new_patched do
    {:ok, extract_node} = ActionNode.new(Demo.PrepareDocsAction)
    {:ok, analysis_node} = AgentNode.new(Demo.AnalysisAgent)
    {:ok, validate_node} = ActionNode.new(Demo.ValidateDocsAction)

    {:ok, parallel_analysis} =
      FanOutNode.new(
        name: "parallel_analysis",
        branches: [
          extract: extract_node,
          analyze: analysis_node,
          validate: validate_node
        ],
        max_concurrency: 2,
        on_error: :collect_partial
      )

    review_node = %HumanNode{
      name: "review_gate",
      description: "Human reviewer approves or rejects the analysis",
      prompt: "Documents analyzed. Approve for publication?",
      allowed_responses: [:approved, :rejected],
      context_keys: [:prepare, :parallel],
      timeout: 120_000,
      timeout_outcome: :timeout
    }

    agent = Demo.DocumentReviewPipeline.new()

    StratState.update(agent, fn strat ->
      nodes = %{strat.machine.nodes | parallel: parallel_analysis, review: review_node}
      %{strat | machine: %{strat.machine | nodes: nodes}}
    end)
  end
end

IO.puts("Pipeline defined. Ready to run.")

Phase 1: Run Until Human Review

IO.puts(String.duplicate("=", 60))
IO.puts("  DOCUMENT REVIEW PIPELINE")
IO.puts(String.duplicate("=", 60))

agent = Demo.PipelineHelper.new_patched()

{agent, directives} =
  Demo.DocumentReviewPipeline.run(agent, %{
    source: "legal_repo",
    doc_ids: ["CONTRACT-A", "CONTRACT-B", "AMENDMENT-1"],
    org_id: "acme-legal",
    project_id: "proj-42"
  })

IO.puts("\nPhase 1: Running pipeline...")

{:suspend, suspended_agent, _suspend_directive} =
  Demo.DirectiveRunner.run_until_done(Demo.DocumentReviewPipeline, agent, directives)

strat = StratState.get(suspended_agent)
suspension = strat.pending_suspension
ctx = Context.to_flat_map(strat.machine.context)

IO.puts("Pipeline suspended at: #{strat.machine.status}")
IO.puts("Suspension reason: #{suspension.reason}")
IO.puts("Approval prompt: #{suspension.approval_request.prompt}")
IO.puts("\nResults so far:")
IO.puts("  Prepare: #{ctx[:prepare][:count]} documents gathered")
IO.puts("  Extract branch: #{inspect(ctx[:parallel][:extract][:count])} docs extracted")
IO.puts("  Analyze branch: #{inspect(ctx[:parallel][:analyze][:load][:loaded])} docs analyzed")
IO.puts("  Validate branch: all valid? #{inspect(ctx[:parallel][:validate][:all_valid])}")
IO.puts("  Ambient org_id: #{inspect(ctx[Jido.Composer.Context.ambient_key()][:org_id])}")

Phase 2: Checkpoint the Suspended Pipeline

IO.puts("\nPhase 2: Checkpointing suspended pipeline...")

checkpoint_data = Checkpoint.prepare_for_checkpoint(strat)
binary = :erlang.term_to_binary(checkpoint_data, [:compressed])

IO.puts("  Checkpoint size: #{byte_size(binary)} bytes")
IO.puts("  Schema version: #{Checkpoint.schema_version()}")
IO.puts("  Suspension preserved: #{checkpoint_data.pending_suspension.id}")

Phase 3: Thaw from Checkpoint

IO.puts("\nPhase 3: Process restarted. Thawing from checkpoint...")

fresh_agent = Demo.PipelineHelper.new_patched()
restored = :erlang.binary_to_term(binary)
restored = Checkpoint.reattach_runtime_config(restored, [])
restored_agent = StratState.put(fresh_agent, restored)

IO.puts("  Agent thawed. Status: #{restored.status}")
IO.puts("  Machine state: #{restored.machine.status}")
IO.puts("  Pending suspension ID: #{restored.pending_suspension.id}")

Phase 4: Resume with Human Approval

IO.puts("\nPhase 4: Human reviewer approves...")

suspension_id = restored.pending_suspension.id

{:ok, resumed_agent, resume_directives} =
  Resume.resume(
    restored_agent,
    suspension_id,
    %{
      request_id: restored.pending_suspension.approval_request.id,
      decision: :approved,
      respondent: "legal-reviewer@acme.com",
      comment: "All contracts verified. Approved for publication."
    },
    deliver_fn: fn agent, signal ->
      Demo.DocumentReviewPipeline.cmd(agent, signal)
    end
  )

IO.puts("  Resume signal delivered. Running remaining steps...")

case Demo.DirectiveRunner.run_until_done(Demo.DocumentReviewPipeline, resumed_agent, resume_directives) do
  {:ok, final_agent} ->
    final_strat = StratState.get(final_agent)
    final_ctx = Context.to_flat_map(final_strat.machine.context)

    IO.puts("\n" <> String.duplicate("=", 60))
    IO.puts("  PIPELINE COMPLETE")
    IO.puts(String.duplicate("=", 60))
    IO.puts("  Status: #{final_strat.status}")
    IO.puts("  Documents published: #{final_ctx[:publish][:published]}")
    IO.puts("  Published at: #{final_ctx[:publish][:published_at]}")
    IO.puts("  Reviewer: #{final_ctx[:publish][:reviewer]}")
    IO.puts("  Decision: #{inspect(final_ctx[:hitl_response][:decision])}")
    IO.puts("  Comment: #{final_ctx[:hitl_response][:comment]}")
    IO.puts("  Ambient org_id: #{final_ctx[Jido.Composer.Context.ambient_key()][:org_id]}")
    IO.puts("\nLifecycle:")
    IO.puts("  Define -> Run -> FanOut(3 branches, 1 nested agent) -> Suspend(HITL)")
    IO.puts("  -> Checkpoint -> Serialize -> Thaw -> Resume -> Publish -> Done")

  other ->
    IO.puts("Unexpected: #{inspect(other)}")
end

Summary

CapabilityWhat You Saw
Workflow FSMOuter pipeline with deterministic state transitions
FanOut3 parallel branches (action + agent + action) with backpressure
AgentNodeInner AnalysisAgent ran as a native node inside FanOut
HumanNodeReview gate suspended the pipeline for human approval
Checkpoint/ThawSerialized suspended state, restored in a fresh process
Context LayersAmbient data (org_id) flowed through the entire pipeline

This is the full composition model: deterministic workflows orchestrate parallel branches containing nested agents, with human oversight gates and durable state that survives process boundaries.