Workflows are deterministic FSM pipelines where each state binds to a node (action, agent, fan-out, or human gate) and transitions are fully determined by outcomes.

Upgrading from 0.2 to 0.3

run_sync now returns the original error reason instead of {:error, :workflow_failed}. If you pattern-match on :workflow_failed, update to match on {:error, reason}:

# Before (0.2)
{:error, :workflow_failed} = MyWorkflow.run_sync(agent, params)

# After (0.3)
{:error, reason} = MyWorkflow.run_sync(agent, params)

reason is typically a Jido.Action.Error struct, a child agent error, or a transition error. See Error Handling for details.

FSM Lifecycle

stateDiagram-v2
    [*] --> idle : new()
    idle --> running : run() / run_sync()
    running --> running : node completes, transition to next state
    running --> suspended : HumanNode / suspension
    suspended --> running : resume
    running --> success : reached terminal state (done)
    running --> failure : reached terminal state (failed)
    success --> [*]
    failure --> [*]

DSL Options

OptionTypeRequiredDefaultDescription
namestringyesUnique workflow identifier
descriptionstringno"Workflow: #{name}"Documentation text
schemakeywordno[]Input validation schema (NimbleOptions)
nodesmapyesMap of state_atom => node bindings
transitionsmapyesMap of {state, outcome} => next_state
initialatomyesStarting state
terminal_states[atom]no[:done, :failed]States that end the workflow (must pair with success_states when provided)
success_states[atom]no[:done]Subset of terminal_states indicating success (must pair with terminal_states)
ambient[atom]no[]Context keys made read-only across all nodes
fork_fnsmapno%{}%{name => {module, function, args}} for context transformation at child boundaries

Node Types

The nodes map values can be:

Action Modules

Bare action modules are wrapped as ActionNode automatically:

nodes: %{
  extract: ExtractAction,
  transform: TransformAction
}

Agent Modules

Agent modules are detected and wrapped as AgentNode:

nodes: %{
  analyze: AnalyzerAgent,
  process: {ProcessorAgent, [mode: :sync]}  # with options
}

FanOutNode

Parallel execution of multiple branches:

{:ok, fan_out} = Jido.Composer.Node.FanOutNode.new(
  name: "parallel_review",
  branches: [
    review_a: action_node_a,
    review_b: action_node_b
  ],
  merge: :deep_merge,        # or custom fn
  on_error: :fail_fast,      # or :collect_partial
  max_concurrency: 4,
  timeout: 30_000
)

nodes: %{
  prepare: PrepareAction,
  review: fan_out,
  finalize: FinalizeAction
}

FanOutNode options:

OptionTypeDefaultDescription
namestringrequiredBranch group identifier
brancheskeywordrequired[{name, node_or_function}, ...]
merge:deep_merge | function:deep_mergeHow to merge branch results
on_error:fail_fast | :collect_partial:fail_fastError handling policy
max_concurrencyintegerunlimitedConcurrent branch limit
timeoutms | :infinity30_000Per-branch timeout

HumanNode

Pauses the workflow for human input:

nodes: %{
  process: ProcessAction,
  approval: %Jido.Composer.Node.HumanNode{
    name: "deploy_approval",
    description: "Approve deployment to production",
    prompt: "Deploy version 2.1 to production?",
    allowed_responses: [:approved, :rejected],
    timeout: 300_000,
    timeout_outcome: :timeout
  },
  deploy: DeployAction
}

HumanNode fields:

FieldTypeDefaultDescription
namestringrequiredNode identifier
descriptionstringrequiredWhat this approval is for
promptstring | functionrequiredQuestion for the human. Can be fn context -> string end for dynamic prompts
allowed_responses[atom][:approved, :rejected]Valid response options
response_schemakeyword[]Schema for structured response data
context_keys[atom] | nilnil (all)Which context keys to show the human
timeoutms | :infinity:infinityDecision deadline
timeout_outcomeatom:timeoutOutcome when timeout expires

HumanNode always returns {:ok, context, :suspend}. The strategy recognizes :suspend as a reserved outcome and emits a Suspend directive with an embedded ApprovalRequest.

Transitions

Transitions map {state, outcome} pairs to the next state:

transitions: %{
  {:extract, :ok}      => :transform,   # success path
  {:extract, :error}   => :failed,      # error path
  {:check, :ok}        => :process,     # validation passed
  {:check, :invalid}   => :quarantine,  # custom outcome
  {:check, :retry}     => :retry_step,  # custom outcome
  {:_, :error}         => :failed       # wildcard: any state on error
}

Custom Outcomes

Actions can return custom outcomes to drive branching:

defmodule ValidateAction do
  use Jido.Action, name: "validate", schema: [data: [type: :string, required: true]]

  @impl true
  def run(%{data: "valid"}, _ctx), do: {:ok, %{validated: true}}
  def run(%{data: "invalid"}, _ctx), do: {:ok, %{validated: false}, :invalid}
  def run(%{data: "retry"}, _ctx), do: {:ok, %{validated: false}, :retry}
end

The three-element {:ok, result, outcome} tuple triggers the corresponding transition instead of the default :ok.

Wildcard Transitions

{:_, outcome} matches any state for that outcome. Useful for catch-all error handling:

transitions: %{
  {:extract, :ok}   => :transform,
  {:transform, :ok} => :load,
  {:load, :ok}      => :done,
  {:_, :error}      => :failed  # any state on error goes to failed
}

Running Workflows

Async (run/2)

Returns the agent and a list of directives for the runtime to execute:

agent = MyWorkflow.new()
{agent, directives} = MyWorkflow.run(agent, %{input: "data"})

Blocking (run_sync/2)

Executes all directives internally and returns the final context:

agent = MyWorkflow.new()
{:ok, result} = MyWorkflow.run_sync(agent, %{input: "data"})

If the workflow suspends (e.g., at a HumanNode), run_sync returns {:error, {:suspended, suspension}}.

Error Handling

When a node fails, the original error reason is preserved through the workflow pipeline and returned to the caller. The {:error, reason} from run_sync contains the actual error — not a generic atom:

case MyWorkflow.run_sync(agent, %{input: "data"}) do
  {:ok, result} ->
    result

  {:error, %Jido.Action.Error.ExecutionFailureError{message: msg}} ->
    # Action execution failed — original error preserved
    Logger.error("Action failed: #{msg}")

  {:error, {:suspended, suspension}} ->
    # Workflow suspended for human input
    handle_suspension(suspension)

  {:error, reason} ->
    # Other errors (transition failures, etc.)
    Logger.error("Workflow failed: #{inspect(reason)}")
end

Error reasons flow from the failing node through the strategy to the caller:

  1. Action errors — When Jido.Exec.run returns {:error, reason}, the reason (typically a Jido.Action.Error struct) is captured
  2. Child agent errors — When a nested agent returns {:error, reason}, the inner reason propagates to the parent
  3. Transition errors — When the FSM has no matching transition, the transition error is captured
  4. FanOut errors — In :fail_fast mode, the first branch error is captured

In practice, every failure path captures the original error. The only scenario where run_sync returns the generic {:error, :workflow_failed} is if the workflow reaches a :failed terminal state through a valid transition without any node having errored — an edge case that typically indicates a workflow design issue rather than a runtime failure.

Context Accumulation

Each node's result is deep-merged into the context under its state name:

# After extract runs: context[:extract] => %{records: [...]}
# After transform runs: context[:transform] => %{records: [...]}
# Initial params preserved: context[:source] => "db"

This scoping prevents key collisions between nodes. Downstream nodes can read upstream results via their state names.

Deep merge semantics: Maps are merged recursively — nested keys are combined rather than overwritten. If two maps share the same nested path, the later value wins at the leaf level. Because each node's output is scoped under its state name, collisions between different nodes are impossible.

Workflows sit at the fully deterministic end of the control spectrum — every transition is explicitly defined at compile time, with no runtime decision-making. For adaptive behavior, see Orchestrators. For mixing both, see Composition & Nesting.

Ambient Context

Keys listed in :ambient are read-only and visible to all nodes via context[Jido.Composer.Context.ambient_key()]:

use Jido.Composer.Workflow,
  ambient: [:api_key, :config],
  # ...

# All nodes receive ambient data under a tuple key:
# params[Jido.Composer.Context.ambient_key()][:api_key]

Fork Functions

Transform the ambient context when crossing agent boundaries (for nesting):

use Jido.Composer.Workflow,
  fork_fns: %{
    depth: {MyModule, :increment_depth, []},
    trace: {MyModule, :append_trace, [:workflow_name]}
  },
  # ...

Custom Terminal and Success States

When neither terminal_states nor success_states is provided, the convention defaults apply: terminal_states: [:done, :failed] with success_states: [:done].

To customize, you must provide both options — providing one without the other is a compile error:

defmodule ReviewPipeline do
  use Jido.Composer.Workflow,
    name: "review_pipeline",
    nodes: %{
      check: CheckAction,
      review: ReviewAction
    },
    transitions: %{
      {:check, :ok}       => :review,
      {:review, :ok}      => :approved,
      {:review, :rejected} => :rejected,
      {:_, :error}        => :errored
    },
    initial: :check,
    terminal_states: [:approved, :rejected, :errored],
    success_states: [:approved]
end

The success_states must be a subset of terminal_states. The strategy uses this to determine whether the workflow completed successfully or with a failure.

Compile-Time Validation

The workflow DSL validates at compile time:

  • Errors (halt compilation):

    • Transition targets must be defined nodes or terminal states
    • Initial state must exist in nodes
  • Warnings:

    • Unreachable states (not reachable from initial via transitions)
    • Dead-end states (non-terminal states with no outgoing transitions)