Nous.Workflow.Graph (nous v0.13.3)

View Source

Directed graph definition for workflow execution.

Uses dual adjacency maps (out_edges and in_edges) for efficient forward traversal and topological sort. Provides an Ecto.Multi-style builder API for constructing graphs via pipes.

Architecture

  • Node IDs are atoms in the builder API, stored as strings internally
  • Dual adjacency maps give O(1) neighbor lookups in both directions
  • Pure maps, no ETS — immutable, process-safe
  • Metadata lives directly on node/edge structs, no separate stores

Examples

graph =
  Nous.Workflow.Graph.new("my_pipeline")
  |> Nous.Workflow.Graph.add_node(:fetch, :agent_step, %{agent: fetcher, prompt: "..."})
  |> Nous.Workflow.Graph.add_node(:process, :transform, %{transform_fn: &process/1})
  |> Nous.Workflow.Graph.add_node(:store, :tool_step, %{tool: store_tool, args: %{}})
  |> Nous.Workflow.Graph.chain([:fetch, :process, :store])

Summary

Functions

Connect a list of nodes in sequence: A → B → C → D.

Connect two nodes with an edge.

Create a new empty graph.

Returns the number of nodes in the graph.

Returns all node IDs in the graph.

Returns the predecessor node IDs for a given node (via in_edges).

Remove a node and reconnect its predecessors to its successors.

Set the entry node explicitly (overrides the auto-detected first node).

Returns the successor node IDs for a given node (via out_edges).

Returns terminal nodes (nodes with no outgoing edges).

Types

node_id()

@type node_id() :: String.t()

t()

@type t() :: %Nous.Workflow.Graph{
  allows_cycles: boolean(),
  entry_node: node_id() | nil,
  id: String.t(),
  in_edges: %{required(node_id()) => [Nous.Workflow.Edge.t()]},
  metadata: map(),
  name: String.t(),
  nodes: %{required(node_id()) => Nous.Workflow.Node.t()},
  out_edges: %{required(node_id()) => [Nous.Workflow.Edge.t()]}
}

Functions

add_node(graph, node_id, type, config \\ %{}, opts \\ [])

@spec add_node(
  t(),
  atom() | String.t(),
  Nous.Workflow.Node.node_type(),
  map(),
  keyword()
) :: t()

Add a node to the graph.

The first node added becomes the entry node automatically.

Parameters

  • node_id — atom or string identifier for the node
  • type — one of the valid Nous.Workflow.Node types
  • config — type-specific configuration map
  • opts — optional node fields (:label, :error_strategy, :timeout, :metadata)

Examples

graph
|> Nous.Workflow.Graph.add_node(:fetch, :agent_step, %{agent: my_agent, prompt: "..."})
|> Nous.Workflow.Graph.add_node(:process, :transform, %{transform_fn: &clean/1}, label: "Clean data")

chain(graph, node_ids)

@spec chain(t(), [atom() | String.t()]) :: t()

Connect a list of nodes in sequence: A → B → C → D.

Examples

graph
|> Nous.Workflow.Graph.chain([:plan, :search, :synthesize, :report])

connect(graph, from, to, opts \\ [])

@spec connect(t(), atom() | String.t(), atom() | String.t(), keyword()) :: t()

Connect two nodes with an edge.

Options

  • :condition — function (state -> boolean) for conditional edges
  • :label — human-readable edge label
  • :metadata — arbitrary edge metadata

When :condition is provided, the edge type is :conditional. Otherwise, it defaults to :sequential.

Examples

graph
|> Nous.Workflow.Graph.connect(:a, :b)
|> Nous.Workflow.Graph.connect(:b, :c, condition: fn s -> s.data.ready end)

insert_after(graph, after_id, new_id, type, config \\ %{}, opts \\ [])

@spec insert_after(
  t(),
  atom() | String.t(),
  atom() | String.t(),
  Nous.Workflow.Node.node_type(),
  map(),
  keyword()
) :: t()

Insert a new node after an existing node.

Splits all outgoing edges of after_id: removes after_id → X edges and creates after_id → new_node + new_node → X edges. Used for runtime graph mutation.

new(id, opts \\ [])

@spec new(
  String.t(),
  keyword()
) :: t()

Create a new empty graph.

Options

  • :name — human-readable name (defaults to the id)
  • :allows_cycles — whether cycles are permitted (default: false)

Examples

iex> graph = Nous.Workflow.Graph.new("research")
iex> graph.id
"research"
iex> graph.nodes
%{}

node_count(graph)

@spec node_count(t()) :: non_neg_integer()

Returns the number of nodes in the graph.

node_ids(graph)

@spec node_ids(t()) :: [node_id()]

Returns all node IDs in the graph.

predecessors(graph, node_id)

@spec predecessors(t(), node_id()) :: [node_id()]

Returns the predecessor node IDs for a given node (via in_edges).

remove_node(graph, node_id)

@spec remove_node(t(), atom() | String.t()) :: t()

Remove a node and reconnect its predecessors to its successors.

set_entry(graph, node_id)

@spec set_entry(t(), atom() | String.t()) :: t()

Set the entry node explicitly (overrides the auto-detected first node).

successors(graph, node_id)

@spec successors(t(), node_id()) :: [node_id()]

Returns the successor node IDs for a given node (via out_edges).

terminal_nodes(graph)

@spec terminal_nodes(t()) :: [node_id()]

Returns terminal nodes (nodes with no outgoing edges).