Mix.install(
[
{:jido_composer, ">= 0.0.0"},
{:kino, "~> 0.14"}
],
config: [
jido_action: [default_timeout: :timer.minutes(5)]
]
)Introduction
This guide walks you through building your first Workflow with Jido Composer — a deterministic, FSM-based pipeline that processes data through a series of steps.
We'll build a classic Extract-Transform-Load (ETL) pipeline:
- Extract — Pull records from a data source
- Transform — Modify each record (uppercase the source field)
- Load — Store the transformed records
Each step is an Action (a pure function params -> {:ok, result}), and the
Workflow wires them together as an FSM. The output of each step is scoped under
its state name in the accumulated context, so downstream steps can read upstream results.
stateDiagram-v2
[*] --> extract
extract --> transform : ok
transform --> load : ok
load --> done : ok
extract --> failed : error
transform --> failed : error
load --> failed : errorStep 1: Define the Actions
Actions are the building blocks. Each one validates its input via a schema, does
some work, and returns {:ok, result}.
defmodule Demo.ExtractAction do
use Jido.Action,
name: "extract",
description: "Extracts records from a data source",
schema: [
source: [type: :string, required: true, doc: "Data source identifier"]
]
def run(%{source: source}, _context) do
# In a real system, this would query a database or API
records = [
%{id: 1, source: source, name: "Alice", role: "engineer"},
%{id: 2, source: source, name: "Bob", role: "designer"},
%{id: 3, source: source, name: "Carol", role: "manager"}
]
{:ok, %{records: records, count: length(records)}}
end
end
defmodule Demo.TransformAction do
use Jido.Action,
name: "transform",
description: "Uppercases the source field in extracted records",
schema: [
extract: [type: :map, required: false, doc: "Results from extract step"]
]
def run(params, _context) do
records = get_in(params, [:extract, :records]) || []
transformed =
Enum.map(records, fn rec ->
Map.update(rec, :source, "", &String.upcase/1)
end)
{:ok, %{records: transformed, count: length(transformed)}}
end
end
defmodule Demo.LoadAction do
use Jido.Action,
name: "load",
description: "Loads transformed records into storage",
schema: [
transform: [type: :map, required: false, doc: "Results from transform step"]
]
def run(params, _context) do
records = get_in(params, [:transform, :records]) || []
{:ok, %{loaded: length(records), status: :complete}}
end
end
# Jido.Agent generates functions with iex> doctests that Livebook tries to run.
# This macro suppresses them by overriding with @doc false.
defmodule Demo.Helpers do
defmacro suppress_agent_doctests do
quote do
@doc false
def plugins, do: super()
@doc false
def capabilities, do: super()
@doc false
def signal_types, do: super()
end
end
end
IO.puts("Actions defined: ExtractAction, TransformAction, LoadAction")Step 2: Define the Workflow
A Workflow is an Elixir module that uses the Jido.Composer.Workflow DSL. You declare:
- nodes — A map of
state_name => Action module - transitions — An FSM transition table:
{state, outcome} => next_state - initial — The starting state
defmodule Demo.ETLWorkflow do
@moduledoc false
use Jido.Composer.Workflow,
name: "etl_pipeline",
description: "Extract, transform, load pipeline",
nodes: %{
extract: Demo.ExtractAction,
transform: Demo.TransformAction,
load: Demo.LoadAction
},
transitions: %{
{:extract, :ok} => :transform,
{:transform, :ok} => :load,
{:load, :ok} => :done,
{:_, :error} => :failed
},
initial: :extract,
terminal_states: [:done, :failed],
success_states: [:done]
require Demo.Helpers
Demo.Helpers.suppress_agent_doctests()
end
IO.puts("ETLWorkflow defined.")Step 3: Run It
run_sync/2 creates a fresh agent, feeds the initial params, and runs the FSM to completion.
It returns the accumulated context — a map where each step's result is scoped under its state name.
agent = Demo.ETLWorkflow.new()
{:ok, ctx} = Demo.ETLWorkflow.run_sync(agent, %{source: "customer_db"})
IO.puts("=== ETL Pipeline Results ===\n")
IO.puts("Extract (ctx[:extract]):")
IO.inspect(ctx[:extract], pretty: true)
IO.puts("\nTransform (ctx[:transform]):")
IO.inspect(ctx[:transform], pretty: true)
IO.puts("\nLoad (ctx[:load]):")
IO.inspect(ctx[:load], pretty: true)How Context Flows
The key insight is scoped context accumulation:
extractruns with%{source: "customer_db"}→ output stored atctx[:extract]transformreceives the full context, readsctx[:extract][:records]→ output stored atctx[:transform]loadreceives the full context, readsctx[:transform][:records]→ output stored atctx[:load]
Each step sees everything that came before, but writes only to its own scope. This prevents naming collisions and makes the data flow explicit.
# You can also use run/2 to get the raw agent + directives for manual stepping.
# This is useful when you need more control (e.g., for HITL or debugging).
agent = Demo.ETLWorkflow.new()
{agent, directives} = Demo.ETLWorkflow.run(agent, %{source: "analytics_db"})
IO.puts("Directives emitted: #{length(directives)}")
IO.inspect(directives, label: "Directives", pretty: true)Next Steps
You've built a linear, deterministic workflow. In the next guide, we'll explore:
- Custom outcomes — Actions that return
:invalidor:retryto branch the FSM - Parallel execution — FanOutNode to run multiple branches concurrently