Mix.install(
  [
    {:jido_composer, ">= 0.0.0"},
    {:kino, "~> 0.14"}
  ],
  config: [
    jido_action: [default_timeout: :timer.minutes(5)]
  ]
)

Introduction

This guide walks you through building your first Workflow with Jido Composer — a deterministic, FSM-based pipeline that processes data through a series of steps.

We'll build a classic Extract-Transform-Load (ETL) pipeline:

  1. Extract — Pull records from a data source
  2. Transform — Modify each record (uppercase the source field)
  3. Load — Store the transformed records

Each step is an Action (a pure function params -> {:ok, result}), and the Workflow wires them together as an FSM. The output of each step is scoped under its state name in the accumulated context, so downstream steps can read upstream results.

stateDiagram-v2
    [*] --> extract
    extract --> transform : ok
    transform --> load : ok
    load --> done : ok
    extract --> failed : error
    transform --> failed : error
    load --> failed : error

Step 1: Define the Actions

Actions are the building blocks. Each one validates its input via a schema, does some work, and returns {:ok, result}.

defmodule Demo.ExtractAction do
  use Jido.Action,
    name: "extract",
    description: "Extracts records from a data source",
    schema: [
      source: [type: :string, required: true, doc: "Data source identifier"]
    ]

  def run(%{source: source}, _context) do
    # In a real system, this would query a database or API
    records = [
      %{id: 1, source: source, name: "Alice", role: "engineer"},
      %{id: 2, source: source, name: "Bob", role: "designer"},
      %{id: 3, source: source, name: "Carol", role: "manager"}
    ]

    {:ok, %{records: records, count: length(records)}}
  end
end

defmodule Demo.TransformAction do
  use Jido.Action,
    name: "transform",
    description: "Uppercases the source field in extracted records",
    schema: [
      extract: [type: :map, required: false, doc: "Results from extract step"]
    ]

  def run(params, _context) do
    records = get_in(params, [:extract, :records]) || []

    transformed =
      Enum.map(records, fn rec ->
        Map.update(rec, :source, "", &String.upcase/1)
      end)

    {:ok, %{records: transformed, count: length(transformed)}}
  end
end

defmodule Demo.LoadAction do
  use Jido.Action,
    name: "load",
    description: "Loads transformed records into storage",
    schema: [
      transform: [type: :map, required: false, doc: "Results from transform step"]
    ]

  def run(params, _context) do
    records = get_in(params, [:transform, :records]) || []
    {:ok, %{loaded: length(records), status: :complete}}
  end
end

# Jido.Agent generates functions with iex> doctests that Livebook tries to run.
# This macro suppresses them by overriding with @doc false.
defmodule Demo.Helpers do
  defmacro suppress_agent_doctests do
    quote do
      @doc false
      def plugins, do: super()
      @doc false
      def capabilities, do: super()
      @doc false
      def signal_types, do: super()
    end
  end
end

IO.puts("Actions defined: ExtractAction, TransformAction, LoadAction")

Step 2: Define the Workflow

A Workflow is an Elixir module that uses the Jido.Composer.Workflow DSL. You declare:

  • nodes — A map of state_name => Action module
  • transitions — An FSM transition table: {state, outcome} => next_state
  • initial — The starting state
defmodule Demo.ETLWorkflow do
  @moduledoc false
  use Jido.Composer.Workflow,
    name: "etl_pipeline",
    description: "Extract, transform, load pipeline",
    nodes: %{
      extract: Demo.ExtractAction,
      transform: Demo.TransformAction,
      load: Demo.LoadAction
    },
    transitions: %{
      {:extract, :ok} => :transform,
      {:transform, :ok} => :load,
      {:load, :ok} => :done,
      {:_, :error} => :failed
    },
    initial: :extract,
    terminal_states: [:done, :failed],
    success_states: [:done]

  require Demo.Helpers
  Demo.Helpers.suppress_agent_doctests()
end

IO.puts("ETLWorkflow defined.")

Step 3: Run It

run_sync/2 creates a fresh agent, feeds the initial params, and runs the FSM to completion. It returns the accumulated context — a map where each step's result is scoped under its state name.

agent = Demo.ETLWorkflow.new()
{:ok, ctx} = Demo.ETLWorkflow.run_sync(agent, %{source: "customer_db"})

IO.puts("=== ETL Pipeline Results ===\n")

IO.puts("Extract (ctx[:extract]):")
IO.inspect(ctx[:extract], pretty: true)

IO.puts("\nTransform (ctx[:transform]):")
IO.inspect(ctx[:transform], pretty: true)

IO.puts("\nLoad (ctx[:load]):")
IO.inspect(ctx[:load], pretty: true)

How Context Flows

The key insight is scoped context accumulation:

  1. extract runs with %{source: "customer_db"} → output stored at ctx[:extract]
  2. transform receives the full context, reads ctx[:extract][:records] → output stored at ctx[:transform]
  3. load receives the full context, reads ctx[:transform][:records] → output stored at ctx[:load]

Each step sees everything that came before, but writes only to its own scope. This prevents naming collisions and makes the data flow explicit.

# You can also use run/2 to get the raw agent + directives for manual stepping.
# This is useful when you need more control (e.g., for HITL or debugging).

agent = Demo.ETLWorkflow.new()
{agent, directives} = Demo.ETLWorkflow.run(agent, %{source: "analytics_db"})

IO.puts("Directives emitted: #{length(directives)}")
IO.inspect(directives, label: "Directives", pretty: true)

Next Steps

You've built a linear, deterministic workflow. In the next guide, we'll explore:

  • Custom outcomes — Actions that return :invalid or :retry to branch the FSM
  • Parallel execution — FanOutNode to run multiple branches concurrently