Branching & Parallel

Copy Markdown View Source
Mix.install(
  [
    {:jido_composer, ">= 0.0.0"},
    {:kino, "~> 0.14"}
  ],
  config: [
    jido_action: [default_timeout: :timer.minutes(5)]
  ]
)

Introduction

This guide covers two powerful Workflow patterns:

  1. Branching — Actions can return custom outcomes beyond :ok/:error, driving the FSM down different paths
  2. Parallel Execution — FanOutNode runs multiple branches concurrently and merges results

Part 1: Branching with Custom Outcomes

In a real system, validation isn't binary. Data might be valid, invalid, or need retry. Actions express this by returning a custom outcome as the third element:

  • {:ok, result} → outcome is :ok
  • {:ok, result, :invalid} → outcome is :invalid
  • {:ok, result, :retry} → outcome is :retry
  • {:error, reason} → outcome is :error

The FSM transition table maps each {state, outcome} to the next state.

stateDiagram-v2
    [*] --> validate
    validate --> process : ok
    validate --> handle_invalid : invalid
    validate --> retry_step : retry
    process --> done : ok
    handle_invalid --> done : ok
    retry_step --> done : ok
    validate --> failed : error

Define the Actions

defmodule Demo.ValidateAction do
  use Jido.Action,
    name: "validate",
    description: "Validates data, returning different outcomes based on content",
    schema: [
      data: [type: :string, required: true, doc: "Data to validate"]
    ]

  def run(%{data: "valid"}, _ctx), do: {:ok, %{validated: true, quality: :good}}
  def run(%{data: "invalid"}, _ctx), do: {:ok, %{validated: false, quality: :bad}, :invalid}
  def run(%{data: "retry"}, _ctx), do: {:ok, %{validated: false, quality: :unstable}, :retry}
  def run(%{data: _}, _ctx), do: {:error, "unrecognized data format"}
end

defmodule Demo.ProcessAction do
  use Jido.Action,
    name: "process",
    description: "Processes validated data",
    schema: []

  def run(_params, _ctx), do: {:ok, %{processed: true}}
end

defmodule Demo.HandleInvalidAction do
  use Jido.Action,
    name: "handle_invalid",
    description: "Handles invalid data by logging and quarantining",
    schema: []

  def run(_params, _ctx), do: {:ok, %{quarantined: true, action: "sent to review queue"}}
end

defmodule Demo.RetryAction do
  use Jido.Action,
    name: "retry_step",
    description: "Handles data that needs retry",
    schema: []

  def run(_params, _ctx), do: {:ok, %{retried: true, action: "scheduled for re-processing"}}
end

# Suppress doctests from Jido.Agent
defmodule Demo.Helpers do
  defmacro suppress_agent_doctests do
    quote do
      @doc false
      def plugins, do: super()
      @doc false
      def capabilities, do: super()
      @doc false
      def signal_types, do: super()
    end
  end
end

IO.puts("Actions defined.")

Define and Run the Branching Workflow

defmodule Demo.QualityGateWorkflow do
  @moduledoc false
  use Jido.Composer.Workflow,
    name: "quality_gate",
    description: "Validation with outcome-driven branching",
    nodes: %{
      validate: Demo.ValidateAction,
      process: Demo.ProcessAction,
      handle_invalid: Demo.HandleInvalidAction,
      retry_step: Demo.RetryAction
    },
    transitions: %{
      {:validate, :ok} => :process,
      {:validate, :invalid} => :handle_invalid,
      {:validate, :retry} => :retry_step,
      {:process, :ok} => :done,
      {:handle_invalid, :ok} => :done,
      {:retry_step, :ok} => :done,
      {:_, :error} => :failed
    },
    initial: :validate,
    terminal_states: [:done, :failed],
    success_states: [:done]

  require Demo.Helpers
  Demo.Helpers.suppress_agent_doctests()
end

# Run all three branches
for input <- ["valid", "invalid", "retry"] do
  agent = Demo.QualityGateWorkflow.new()
  {:ok, ctx} = Demo.QualityGateWorkflow.run_sync(agent, %{data: input})

  IO.puts("Input: #{inspect(input)}")
  IO.puts("  Validate result: #{inspect(ctx[:validate])}")

  cond do
    ctx[:process] -> IO.puts("  Path taken: validate -> process -> done")
    ctx[:handle_invalid] -> IO.puts("  Path taken: validate -> handle_invalid -> done")
    ctx[:retry_step] -> IO.puts("  Path taken: validate -> retry_step -> done")
  end

  IO.puts("")
end

Part 2: Parallel Execution with FanOutNode

Sometimes you need to run independent tasks concurrently. FanOutNode executes multiple branches in parallel and merges their results under a single state.

Each branch is an ActionNode (or AgentNode, or function). Results are scoped under ctx[:fan_out_state][:branch_name].

stateDiagram-v2
    [*] --> parallel
    parallel --> finalize : ok
    finalize --> done : ok
    parallel --> failed : error

Define Branch Actions

defmodule Demo.AddAction do
  use Jido.Action,
    name: "add",
    description: "Adds two numbers",
    schema: [
      value: [type: :float, required: true, doc: "The current value"],
      amount: [type: :float, required: true, doc: "The amount to add"]
    ]

  def run(%{value: value, amount: amount}, _ctx) do
    {:ok, %{result: value + amount}}
  end
end

defmodule Demo.EchoAction do
  use Jido.Action,
    name: "echo",
    description: "Echoes a message",
    schema: [
      message: [type: :string, required: true, doc: "Message to echo"]
    ]

  def run(%{message: message}, _ctx), do: {:ok, %{echoed: message}}
end

defmodule Demo.NoopAction do
  use Jido.Action,
    name: "noop",
    description: "Does nothing",
    schema: []

  def run(_params, _ctx), do: {:ok, %{}}
end

IO.puts("Branch actions defined.")

Build and Run the FanOut Workflow

alias Jido.Composer.Node.{ActionNode, FanOutNode}

defmodule Demo.FanOutWorkflow do
  @moduledoc false
  alias Jido.Composer.Node.{ActionNode, FanOutNode}

  {:ok, add_node} = ActionNode.new(Demo.AddAction)
  {:ok, echo_node} = ActionNode.new(Demo.EchoAction)

  {:ok, fan_out} =
    FanOutNode.new(
      name: "parallel_compute",
      branches: [math: add_node, echo: echo_node]
    )

  use Jido.Composer.Workflow,
    name: "fan_out_demo",
    description: "Parallel execution with FanOutNode",
    nodes: %{
      parallel: fan_out,
      finalize: Demo.NoopAction
    },
    transitions: %{
      {:parallel, :ok} => :finalize,
      {:finalize, :ok} => :done,
      {:_, :error} => :failed
    },
    initial: :parallel,
    terminal_states: [:done, :failed],
    success_states: [:done]

  require Demo.Helpers
  Demo.Helpers.suppress_agent_doctests()
end

agent = Demo.FanOutWorkflow.new()
{:ok, ctx} = Demo.FanOutWorkflow.run_sync(agent, %{value: 10.0, amount: 5.0, message: "hello"})

IO.puts("=== Parallel Execution Results ===\n")
IO.puts("Math branch (10.0 + 5.0): #{inspect(ctx[:parallel][:math])}")
IO.puts("Echo branch:              #{inspect(ctx[:parallel][:echo])}")
IO.puts("\nBoth branches ran concurrently. Results merged under ctx[:parallel].")

Error Handling: Fail-Fast vs Collect-Partial

FanOutNode supports two error policies:

  • :fail_fast — If any branch fails, the entire FanOut fails immediately
  • :collect_partial — Continue running other branches even if one fails
alias Jido.Composer.Node.{ActionNode, FanOutNode}

defmodule Demo.FailAction do
  use Jido.Action,
    name: "fail",
    description: "Always fails",
    schema: [reason: [type: :string, required: false]]

  def run(params, _ctx), do: {:error, Map.get(params, :reason, "intentional failure")}
end

# Build a FanOut with one good and one bad branch
{:ok, good_node} = ActionNode.new(Demo.AddAction)
{:ok, bad_node} = ActionNode.new(Demo.FailAction)

{:ok, fail_fast_fan} =
  FanOutNode.new(
    name: "fail_fast_demo",
    branches: [good: good_node, bad: bad_node],
    on_error: :fail_fast
  )

result = FanOutNode.run(fail_fast_fan, %{value: 1.0, amount: 2.0, reason: "boom"}, [])
IO.puts("=== Fail-Fast Result ===")
IO.puts("One branch failed -> entire FanOut failed: #{inspect(result)}")

Next Steps

You now know how to:

  • Branch workflows based on custom action outcomes
  • Run actions in parallel with FanOutNode
  • Choose between fail-fast and collect-partial error policies

Next guide: Approval Workflows — suspend for human input, approve/reject, and checkpoint/restore across process restarts.