Mix.install(
[
{:jido_composer, ">= 0.0.0"},
{:kino, "~> 0.14"}
],
config: [
jido_action: [default_timeout: :timer.minutes(5)]
]
)Introduction
This guide covers two powerful Workflow patterns:
- Branching — Actions can return custom outcomes beyond
:ok/:error, driving the FSM down different paths - Parallel Execution — FanOutNode runs multiple branches concurrently and merges results
Part 1: Branching with Custom Outcomes
In a real system, validation isn't binary. Data might be valid, invalid, or need retry. Actions express this by returning a custom outcome as the third element:
{:ok, result}→ outcome is:ok{:ok, result, :invalid}→ outcome is:invalid{:ok, result, :retry}→ outcome is:retry{:error, reason}→ outcome is:error
The FSM transition table maps each {state, outcome} to the next state.
stateDiagram-v2
[*] --> validate
validate --> process : ok
validate --> handle_invalid : invalid
validate --> retry_step : retry
process --> done : ok
handle_invalid --> done : ok
retry_step --> done : ok
validate --> failed : errorDefine the Actions
defmodule Demo.ValidateAction do
use Jido.Action,
name: "validate",
description: "Validates data, returning different outcomes based on content",
schema: [
data: [type: :string, required: true, doc: "Data to validate"]
]
def run(%{data: "valid"}, _ctx), do: {:ok, %{validated: true, quality: :good}}
def run(%{data: "invalid"}, _ctx), do: {:ok, %{validated: false, quality: :bad}, :invalid}
def run(%{data: "retry"}, _ctx), do: {:ok, %{validated: false, quality: :unstable}, :retry}
def run(%{data: _}, _ctx), do: {:error, "unrecognized data format"}
end
defmodule Demo.ProcessAction do
use Jido.Action,
name: "process",
description: "Processes validated data",
schema: []
def run(_params, _ctx), do: {:ok, %{processed: true}}
end
defmodule Demo.HandleInvalidAction do
use Jido.Action,
name: "handle_invalid",
description: "Handles invalid data by logging and quarantining",
schema: []
def run(_params, _ctx), do: {:ok, %{quarantined: true, action: "sent to review queue"}}
end
defmodule Demo.RetryAction do
use Jido.Action,
name: "retry_step",
description: "Handles data that needs retry",
schema: []
def run(_params, _ctx), do: {:ok, %{retried: true, action: "scheduled for re-processing"}}
end
# Suppress doctests from Jido.Agent
defmodule Demo.Helpers do
defmacro suppress_agent_doctests do
quote do
@doc false
def plugins, do: super()
@doc false
def capabilities, do: super()
@doc false
def signal_types, do: super()
end
end
end
IO.puts("Actions defined.")Define and Run the Branching Workflow
defmodule Demo.QualityGateWorkflow do
@moduledoc false
use Jido.Composer.Workflow,
name: "quality_gate",
description: "Validation with outcome-driven branching",
nodes: %{
validate: Demo.ValidateAction,
process: Demo.ProcessAction,
handle_invalid: Demo.HandleInvalidAction,
retry_step: Demo.RetryAction
},
transitions: %{
{:validate, :ok} => :process,
{:validate, :invalid} => :handle_invalid,
{:validate, :retry} => :retry_step,
{:process, :ok} => :done,
{:handle_invalid, :ok} => :done,
{:retry_step, :ok} => :done,
{:_, :error} => :failed
},
initial: :validate,
terminal_states: [:done, :failed],
success_states: [:done]
require Demo.Helpers
Demo.Helpers.suppress_agent_doctests()
end
# Run all three branches
for input <- ["valid", "invalid", "retry"] do
agent = Demo.QualityGateWorkflow.new()
{:ok, ctx} = Demo.QualityGateWorkflow.run_sync(agent, %{data: input})
IO.puts("Input: #{inspect(input)}")
IO.puts(" Validate result: #{inspect(ctx[:validate])}")
cond do
ctx[:process] -> IO.puts(" Path taken: validate -> process -> done")
ctx[:handle_invalid] -> IO.puts(" Path taken: validate -> handle_invalid -> done")
ctx[:retry_step] -> IO.puts(" Path taken: validate -> retry_step -> done")
end
IO.puts("")
endPart 2: Parallel Execution with FanOutNode
Sometimes you need to run independent tasks concurrently. FanOutNode executes multiple branches in parallel and merges their results under a single state.
Each branch is an ActionNode (or AgentNode, or function). Results are scoped
under ctx[:fan_out_state][:branch_name].
stateDiagram-v2
[*] --> parallel
parallel --> finalize : ok
finalize --> done : ok
parallel --> failed : errorDefine Branch Actions
defmodule Demo.AddAction do
use Jido.Action,
name: "add",
description: "Adds two numbers",
schema: [
value: [type: :float, required: true, doc: "The current value"],
amount: [type: :float, required: true, doc: "The amount to add"]
]
def run(%{value: value, amount: amount}, _ctx) do
{:ok, %{result: value + amount}}
end
end
defmodule Demo.EchoAction do
use Jido.Action,
name: "echo",
description: "Echoes a message",
schema: [
message: [type: :string, required: true, doc: "Message to echo"]
]
def run(%{message: message}, _ctx), do: {:ok, %{echoed: message}}
end
defmodule Demo.NoopAction do
use Jido.Action,
name: "noop",
description: "Does nothing",
schema: []
def run(_params, _ctx), do: {:ok, %{}}
end
IO.puts("Branch actions defined.")Build and Run the FanOut Workflow
alias Jido.Composer.Node.{ActionNode, FanOutNode}
defmodule Demo.FanOutWorkflow do
@moduledoc false
alias Jido.Composer.Node.{ActionNode, FanOutNode}
{:ok, add_node} = ActionNode.new(Demo.AddAction)
{:ok, echo_node} = ActionNode.new(Demo.EchoAction)
{:ok, fan_out} =
FanOutNode.new(
name: "parallel_compute",
branches: [math: add_node, echo: echo_node]
)
use Jido.Composer.Workflow,
name: "fan_out_demo",
description: "Parallel execution with FanOutNode",
nodes: %{
parallel: fan_out,
finalize: Demo.NoopAction
},
transitions: %{
{:parallel, :ok} => :finalize,
{:finalize, :ok} => :done,
{:_, :error} => :failed
},
initial: :parallel,
terminal_states: [:done, :failed],
success_states: [:done]
require Demo.Helpers
Demo.Helpers.suppress_agent_doctests()
end
agent = Demo.FanOutWorkflow.new()
{:ok, ctx} = Demo.FanOutWorkflow.run_sync(agent, %{value: 10.0, amount: 5.0, message: "hello"})
IO.puts("=== Parallel Execution Results ===\n")
IO.puts("Math branch (10.0 + 5.0): #{inspect(ctx[:parallel][:math])}")
IO.puts("Echo branch: #{inspect(ctx[:parallel][:echo])}")
IO.puts("\nBoth branches ran concurrently. Results merged under ctx[:parallel].")Error Handling: Fail-Fast vs Collect-Partial
FanOutNode supports two error policies:
:fail_fast— If any branch fails, the entire FanOut fails immediately:collect_partial— Continue running other branches even if one fails
alias Jido.Composer.Node.{ActionNode, FanOutNode}
defmodule Demo.FailAction do
use Jido.Action,
name: "fail",
description: "Always fails",
schema: [reason: [type: :string, required: false]]
def run(params, _ctx), do: {:error, Map.get(params, :reason, "intentional failure")}
end
# Build a FanOut with one good and one bad branch
{:ok, good_node} = ActionNode.new(Demo.AddAction)
{:ok, bad_node} = ActionNode.new(Demo.FailAction)
{:ok, fail_fast_fan} =
FanOutNode.new(
name: "fail_fast_demo",
branches: [good: good_node, bad: bad_node],
on_error: :fail_fast
)
result = FanOutNode.run(fail_fast_fan, %{value: 1.0, amount: 2.0, reason: "boom"}, [])
IO.puts("=== Fail-Fast Result ===")
IO.puts("One branch failed -> entire FanOut failed: #{inspect(result)}")Next Steps
You now know how to:
- Branch workflows based on custom action outcomes
- Run actions in parallel with FanOutNode
- Choose between fail-fast and collect-partial error policies
Next guide: Approval Workflows — suspend for human input, approve/reject, and checkpoint/restore across process restarts.