Nous.Workflow.Graph (nous v0.13.3)
View SourceDirected graph definition for workflow execution.
Uses dual adjacency maps (out_edges and in_edges) for efficient
forward traversal and topological sort. Provides an Ecto.Multi-style
builder API for constructing graphs via pipes.
Architecture
- Node IDs are atoms in the builder API, stored as strings internally
- Dual adjacency maps give O(1) neighbor lookups in both directions
- Pure maps, no ETS — immutable, process-safe
- Metadata lives directly on node/edge structs, no separate stores
Examples
graph =
Nous.Workflow.Graph.new("my_pipeline")
|> Nous.Workflow.Graph.add_node(:fetch, :agent_step, %{agent: fetcher, prompt: "..."})
|> Nous.Workflow.Graph.add_node(:process, :transform, %{transform_fn: &process/1})
|> Nous.Workflow.Graph.add_node(:store, :tool_step, %{tool: store_tool, args: %{}})
|> Nous.Workflow.Graph.chain([:fetch, :process, :store])
Summary
Functions
Add a node to the graph.
Connect a list of nodes in sequence: A → B → C → D.
Connect two nodes with an edge.
Insert a new node after an existing node.
Create a new empty graph.
Returns the number of nodes in the graph.
Returns all node IDs in the graph.
Returns the predecessor node IDs for a given node (via in_edges).
Remove a node and reconnect its predecessors to its successors.
Set the entry node explicitly (overrides the auto-detected first node).
Returns the successor node IDs for a given node (via out_edges).
Returns terminal nodes (nodes with no outgoing edges).
Types
@type node_id() :: String.t()
@type t() :: %Nous.Workflow.Graph{ allows_cycles: boolean(), entry_node: node_id() | nil, id: String.t(), in_edges: %{required(node_id()) => [Nous.Workflow.Edge.t()]}, metadata: map(), name: String.t(), nodes: %{required(node_id()) => Nous.Workflow.Node.t()}, out_edges: %{required(node_id()) => [Nous.Workflow.Edge.t()]} }
Functions
Add a node to the graph.
The first node added becomes the entry node automatically.
Parameters
node_id— atom or string identifier for the nodetype— one of the validNous.Workflow.Nodetypesconfig— type-specific configuration mapopts— optional node fields (:label,:error_strategy,:timeout,:metadata)
Examples
graph
|> Nous.Workflow.Graph.add_node(:fetch, :agent_step, %{agent: my_agent, prompt: "..."})
|> Nous.Workflow.Graph.add_node(:process, :transform, %{transform_fn: &clean/1}, label: "Clean data")
Connect a list of nodes in sequence: A → B → C → D.
Examples
graph
|> Nous.Workflow.Graph.chain([:plan, :search, :synthesize, :report])
Connect two nodes with an edge.
Options
:condition— function(state -> boolean)for conditional edges:label— human-readable edge label:metadata— arbitrary edge metadata
When :condition is provided, the edge type is :conditional.
Otherwise, it defaults to :sequential.
Examples
graph
|> Nous.Workflow.Graph.connect(:a, :b)
|> Nous.Workflow.Graph.connect(:b, :c, condition: fn s -> s.data.ready end)
@spec insert_after( t(), atom() | String.t(), atom() | String.t(), Nous.Workflow.Node.node_type(), map(), keyword() ) :: t()
Insert a new node after an existing node.
Splits all outgoing edges of after_id: removes after_id → X edges
and creates after_id → new_node + new_node → X edges.
Used for runtime graph mutation.
Create a new empty graph.
Options
:name— human-readable name (defaults to the id):allows_cycles— whether cycles are permitted (default:false)
Examples
iex> graph = Nous.Workflow.Graph.new("research")
iex> graph.id
"research"
iex> graph.nodes
%{}
@spec node_count(t()) :: non_neg_integer()
Returns the number of nodes in the graph.
Returns all node IDs in the graph.
Returns the predecessor node IDs for a given node (via in_edges).
Remove a node and reconnect its predecessors to its successors.
Set the entry node explicitly (overrides the auto-detected first node).
Returns the successor node IDs for a given node (via out_edges).
Returns terminal nodes (nodes with no outgoing edges).