Journey (Journey v0.10.36)
This module is the entry point for the Journey library. It provides functions for creating and managing computation graphs, starting and managing executions, and retrieving values from executions.
Here is a quick example of how to use the library, illustrating the basic concepts of defining a graph, starting an execution of the graph, and setting input values and getting computed values.
iex> # 1. Define a graph capturing the data and the logic of the application -
iex> # the nodes, their dependencies, and their computations:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "horoscope workflow - module doctest",
...> "v1.0.0",
...> [
...> input(:first_name),
...> input(:birth_day),
...> input(:birth_month),
...> compute(
...> :zodiac_sign,
...> # Depends on user-supplied data:
...> [:birth_month, :birth_day],
...> # Computes itself, once the dependencies are satisfied:
...> fn %{birth_month: _birth_month, birth_day: _birth_day} ->
...> {:ok, "Taurus"}
...> end
...> ),
...> compute(
...> :horoscope,
...> # Computes itself once :first_name and :zodiac_sign are in place:
...> [:first_name, :zodiac_sign],
...> fn %{first_name: name, zodiac_sign: zodiac_sign} ->
...> {:ok, "🍪s await, #{zodiac_sign} #{name}!"}
...> end
...> )
...> ]
...> )
iex>
iex> # 2. For every customer visiting your website, start a new execution of the graph:
iex> e = Journey.start_execution(graph)
iex>
iex> # 3. Populate the execution's nodes with the data as provided by the visitor:
iex> e = Journey.set(e, :birth_day, 26)
iex>
iex> # As a side note: if the user leaves and comes back later or if everything crashes,
iex> # you can always reload the execution using its id:
iex> e = Journey.load(e.id)
iex>
iex> # Continuing, as if nothing happened:
iex> e = Journey.set(e, :birth_month, "April")
iex>
iex> # 4. Now that we have :birth_month and :birth_day, :zodiac_sign will compute itself:
iex> {:ok, "Taurus", _revision} = Journey.get(e, :zodiac_sign, wait: :any)
iex> Journey.values(e) |> redact([:execution_id, :last_updated_at])
%{birth_day: 26, birth_month: "April", zodiac_sign: "Taurus", execution_id: "...", last_updated_at: 1234567890}
iex>
iex> # 5. Once we get :first_name, the :horoscope node will compute itself:
iex> e = Journey.set(e, :first_name, "Mario")
iex> {:ok, "🍪s await, Taurus Mario!", 7} = Journey.get(e, :horoscope, wait: :any)
iex>
iex> Journey.values(e) |> redact([:execution_id, :last_updated_at])
%{birth_day: 26, birth_month: "April", first_name: "Mario", horoscope: "🍪s await, Taurus Mario!", zodiac_sign: "Taurus", execution_id: "...", last_updated_at: 1234567890}
iex>
iex> # 6. and we can always list executions.
iex> this_execution = Journey.list_executions(graph_name: "horoscope workflow - module doctest", order_by_execution_fields: [:inserted_at]) |> Enum.reverse() |> hd
iex> e.id == this_execution.id
true
Summary
Graph Management
Creates a new computation graph with the given name, version, and node definitions.
Execution Lifecycle
Archives an execution, making it invisible and stopping all background processing.
Returns the chronological history of all successful computations and set values for an execution.
Queries and retrieves multiple executions from the database with flexible filtering, sorting, and pagination.
Reloads the current state of an execution from the database to get the latest changes.
Starts a new execution instance of a computation graph, initializing it to accept input values and perform computations.
Un-archives the supplied execution, if it is archived.
Value Operations
Returns the value and revision of a node in an execution. Optionally waits for the value to be set.
Sets values for input nodes in an execution and triggers recomputation of dependent nodes.
Removes values from input nodes in an execution and invalidates all dependent computed nodes.
Data Retrieval
Returns a map of node values in an execution.
Returns a map of all nodes in an execution with their current status, including unset nodes.
Deprecated
Graph Management
Creates a new computation graph with the given name, version, and node definitions.
This is the foundational function for defining Journey graphs. It creates a validated
graph structure that can be used to start executions with start_execution/1. The graph
defines the data flow, dependencies, and computations for your application workflow.
Quick Example
import Journey.Node
graph = Journey.new_graph(
"user onboarding",
"v1.0.0",
[
input(:email),
compute(:welcome_message, [:email], fn %{email: email} ->
{:ok, "Welcome #{email}!"}
end)
]
)
execution = Journey.start_execution(graph)Use start_execution/1 to create executions and set/3 to populate input values.
Parameters
name- String identifying the graph (e.g., "user registration workflow")version- String version identifier following semantic versioning (e.g., "v1.0.0")nodes- List of node definitions created withJourney.Nodefunctions (input/1,compute/4, etc.)opts- Optional keyword list of options::f_on_save- Graph-wide callback function invoked after any node computation succeeds. Receives(execution_id, node_name, result)where result is{:ok, value}or{:error, reason}. This callback is called after any node-specificf_on_savecallbacks.:execution_id_prefix- Custom prefix for execution IDs created from this graph. Will be normalized to uppercase. Defaults to "EXEC" if not specified. Example: "mygraph" becomes "MYGRAPH1A2B3D4E5G6H7J8L9M"
Returns
%Journey.Graph{}struct representing the validated and registered computation graph
Errors
- Raises
RuntimeErrorif graph validation fails (e.g., circular dependencies, unknown node references) - Raises
ArgumentErrorif parameters have invalid types or empty node list - Raises
KeywordValidator.Errorif options are invalid
Key Behaviors
- Validation - Automatically validates graph structure for cycles, dependency correctness
- Registration - Registers graph in catalog for execution tracking and reloading
- Immutable - Graph definition is immutable once created; create new versions for changes
- Node types - Supports input, compute, mutate, schedule_once, and schedule_recurring nodes
f_on_saveCallbacks - If defined, the graph-widef_on_savecallback is called after Node-specificf_on_saves (if defined)
Examples
Basic workflow with input and computation:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "greeting workflow",
...> "v1.0.0",
...> [
...> input(:name),
...> compute(:greeting, [:name], fn %{name: name} -> {:ok, "Hello, #{name}!"} end)
...> ]
...> )
iex> graph.name
"greeting workflow"
iex> execution = Journey.start_execution(graph)
iex> execution = Journey.set(execution, :name, "Alice")
iex> {:ok, "Hello, Alice!", 3} = Journey.get(execution, :greeting, wait: :any)Graph with a graph-wide f_on_save callback:
iex> import Journey.Node
iex> _graph = Journey.new_graph(
...> "notification workflow",
...> "v1.0.0",
...> [
...> input(:user_id),
...> compute(:fetch_user, [:user_id], fn %{user_id: id} ->
...> {:ok, %{id: id, name: "User #{id}"}}
...> end),
...> compute(:send_email, [:fetch_user], fn %{fetch_user: user} ->
...> {:ok, "Email sent to #{user.name}"}
...> end)
...> ],
...> f_on_save: fn _execution_id, node_name, result ->
...> # This will be called for both :fetch_user and :send_email computations
...> IO.puts("Node #{node_name} completed with result: #{inspect(result)}")
...> :ok
...> end
...> )Complex workflow with conditional dependencies:
iex> import Journey.Node
iex> import Journey.Node.Conditions
iex> import Journey.Node.UpstreamDependencies
iex> graph = Journey.new_graph(
...> "horoscope workflow",
...> "v1.0.0",
...> [
...> input(:first_name),
...> input(:birth_day),
...> input(:birth_month),
...> compute(
...> :zodiac_sign,
...> [:birth_month, :birth_day],
...> fn %{birth_month: _birth_month, birth_day: _birth_day} ->
...> {:ok, "Taurus"}
...> end
...> ),
...> compute(
...> :horoscope,
...> unblocked_when({
...> :and,
...> [
...> {:first_name, &provided?/1},
...> {:zodiac_sign, &provided?/1}
...> ]
...> }),
...> fn %{first_name: name, zodiac_sign: zodiac_sign} ->
...> {:ok, "🍪s await, #{zodiac_sign} #{name}!"}
...> end
...> )
...> ]
...> )
iex> execution = Journey.start_execution(graph)
iex> execution = Journey.set(execution, :birth_day, 15)
iex> execution = Journey.set(execution, :birth_month, "May")
iex> {:ok, "Taurus", _revision} = Journey.get(execution, :zodiac_sign, wait: :any)
iex> execution = Journey.set(execution, :first_name, "Bob")
iex> {:ok, "🍪s await, Taurus Bob!", 7} = Journey.get(execution, :horoscope, wait: :any)Multiple node types in a workflow:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "data processing workflow",
...> "v2.1.0",
...> [
...> input(:raw_data),
...> compute(:upper_case, [:raw_data], fn %{raw_data: data} ->
...> {:ok, String.upcase(data)}
...> end),
...> compute(:suffix, [:upper_case], fn %{upper_case: data} ->
...> {:ok, "#{data} omg yay"}
...> end)
...> ]
...> )
iex> execution = Journey.start_execution(graph)
iex> execution = Journey.set(execution, :raw_data, "hello world")
iex> {:ok, "HELLO WORLD", 3} = Journey.get(execution, :upper_case, wait: :any)
iex> {:ok, "HELLO WORLD omg yay", 5} = Journey.get(execution, :suffix, wait: :any)Custom execution ID prefix for easier debugging:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "user onboarding",
...> "v1.0.0",
...> [
...> input(:email),
...> compute(:welcome_message, [:email], fn %{email: email} ->
...> {:ok, "Welcome #{email}!"}
...> end)
...> ],
...> execution_id_prefix: "onboard"
...> )
iex> execution = Journey.start_execution(graph)
iex> String.starts_with?(execution.id, "ONBOARD")
true
iex> execution = Journey.set(execution, :email, "user@example.com")
iex> {:ok, "Welcome user@example.com!", 3} = Journey.get(execution, :welcome_message, wait: :any)
Execution Lifecycle
Archives an execution, making it invisible and stopping all background processing.
Archiving permanently (*) freezes an execution by marking it with an archived timestamp. This removes it from normal visibility and excludes it from all scheduler processing, while preserving the data for potential future access.
*) an execution can be unarchived by calling unarchive/1
Quick Example
archived_at = Journey.archive(execution)
Journey.load(execution) # Returns nil (hidden)
Journey.load(execution, include_archived: true) # Can still accessUse unarchive/1 to reverse archiving and list_executions/1 with :include_archived to find archived executions.
Parameters
execution- A%Journey.Persistence.Schema.Execution{}struct or execution ID string
Returns
- Integer timestamp (Unix epoch seconds) when the execution was archived
Key Behaviors
- Scheduler exclusion - Archived executions are excluded from all background sweeps and processing
- Hidden by default - Not returned by
list_executions/1orload/2unless explicitly included - Idempotent - Archiving an already archived execution returns the existing timestamp
- Reversible - Use
unarchive/1to restore normal visibility and processing
Examples
Basic archiving workflow:
iex> import Journey.Node
iex> graph = Journey.new_graph("archive example", "v1.0.0", [input(:data)])
iex> execution = Journey.start_execution(graph)
iex> execution.archived_at
nil
iex> archived_at = Journey.archive(execution)
iex> is_integer(archived_at)
true
iex> Journey.load(execution)
nil
iex> Journey.load(execution, include_archived: true) != nil
trueIdempotent behavior:
iex> import Journey.Node
iex> graph = Journey.new_graph("archive idempotent", "v1.0.0", [input(:data)])
iex> execution = Journey.start_execution(graph)
iex> first_archive = Journey.archive(execution)
iex> second_archive = Journey.archive(execution)
iex> first_archive == second_archive
true
Returns the chronological history of all successful computations and set values for an execution.
This function provides visibility into the order of operations during execution, showing both value sets and successful computations in chronological order. Only successful computations are included; failed computations are filtered out. At the same revision, computations appear before values.
Quick Example
history = Journey.history(execution)
# [%{node_name: :x, computation_or_value: :value, revision: 1},
# %{node_name: :sum, computation_or_value: :computation, revision: 2}, ...]Use values/2 to see only current values, or set/3 and get_value/3 for individual operations.
Parameters
execution- A%Journey.Persistence.Schema.Execution{}struct or execution ID string
Returns
- List of maps sorted by revision, where each map contains:
:computation_or_value- either:computationor:value:node_name- the name of the node:node_type- the type of the node (:input,:compute,:mutate, etc.):revision- the execution revision when this operation completed:value- the actual value (only present for:valueentries)
Examples
Basic usage showing value sets and computation:
iex> import Journey.Node
iex> graph = Journey.new_graph("history example", "v1.0.0", [
...> input(:x),
...> input(:y),
...> compute(:sum, [:x, :y], fn %{x: x, y: y} -> {:ok, x + y} end)
...> ])
iex> execution = Journey.start_execution(graph)
iex> execution = Journey.set(execution, :x, 10)
iex> execution = Journey.set(execution, :y, 20)
iex> {:ok, 30, _revision} = Journey.get(execution, :sum, wait: :any)
iex> Journey.history(execution) |> Enum.map(fn entry ->
...> case entry.node_name do
...> :execution_id -> %{entry | value: "..."}
...> :last_updated_at -> %{entry | value: 1234567890}
...> _ -> entry
...> end
...> end)
[
%{node_name: :execution_id, node_type: :input, computation_or_value: :value, value: "...", revision: 0},
%{node_name: :x, node_type: :input, computation_or_value: :value, value: 10, revision: 1},
%{node_name: :y, node_type: :input, computation_or_value: :value, value: 20, revision: 2},
%{node_name: :sum, node_type: :compute, computation_or_value: :computation, revision: 4},
%{node_name: :last_updated_at, node_type: :input, computation_or_value: :value, value: 1234567890, revision: 4},
%{node_name: :sum, node_type: :compute, computation_or_value: :value, value: 30, revision: 4}
]
Queries and retrieves multiple executions from the database with flexible filtering, sorting, and pagination.
This function enables searching across all executions in your system, with powerful filtering capabilities based on graph names, node values, and execution metadata. It's essential for monitoring workflows, building dashboards, and analyzing execution patterns.
Quick Example
# List all executions for a specific graph
executions = Journey.list_executions(graph_name: "user_onboarding")
# List executions for a specific graph version
v1_executions = Journey.list_executions(
graph_name: "user_onboarding",
graph_version: "v1.0.0"
)
# Find executions where age > 18
adults = Journey.list_executions(
graph_name: "user_registration",
filter_by: [{:age, :gt, 18}]
)Use with start_execution/1 to create executions and load/2 to get individual execution details.
Parameters
options- Keyword list of query options (all optional)::graph_name- String name of a specific graph to filter by:graph_version- String version of a specific graph to filter by (requires :graph_name):sort_by- List of fields to sort by, including both execution fields and node values (see Sorting section for details):filter_by- List of node value filters using database-level filtering for optimal performance. Each filter is a tuple{node_name, operator, value}or{node_name, operator}for nil checks. Operators::eq,:neq,:lt,:lte,:gt,:gte(comparisons),:in,:not_in(membership),:contains(case-sensitive substring matching, strings only),:icontains(case-insensitive substring matching, strings only),:list_contains(checks if a list-valued node contains the specified string or integer element),:is_nil,:is_not_nil(existence). Values can be strings, numbers, booleans, nil or lists (used with:inand:not_in). Complex values (maps, tuples, functions) will raise an ArgumentError.:limit- Maximum number of results (default: 10,000):offset- Number of results to skip for pagination (default: 0):include_archived- Whether to include archived executions (default: false)
Returns
- List of
%Journey.Persistence.Schema.Execution{}structs with preloaded values and computations - Empty list
[]if no executions match the criteria
Options
:sort_by
Sort by execution fields or node values. Supports atoms for ascending ([:updated_at]),
keywords for direction ([updated_at: :desc]), and mixed formats ([:graph_name, inserted_at: :desc]).
Available fields:
- Execution fields:
:inserted_at,:updated_at,:revision,:graph_name,:graph_version - Node values: Any node name from the graph (e.g.,
:age,:score) using JSONB ordering - Direction:
:asc(default) or:desc
Key Behaviors
- Filtering performed at database level for optimal performance
- Only primitive values supported for filtering (complex types raise errors)
- Archived executions excluded by default
Examples
Basic listing by graph name:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "list example basic - #{Journey.Helpers.Random.random_string_w_time()}",
...> "v1.0.0",
...> [input(:status)]
...> )
iex> Journey.start_execution(graph) |> Journey.set(:status, "active")
iex> Journey.start_execution(graph) |> Journey.set(:status, "pending")
iex> executions = Journey.list_executions(graph_name: graph.name)
iex> length(executions)
2Filtering by graph version:
iex> import Journey.Node
iex> graph_name = "version example RHA8rxL00gJv1761694955387770"
iex> graph_v1 = Journey.new_graph(
...> graph_name,
...> "v1.0.0",
...> [input(:data)]
...> )
iex> graph_v2 = Journey.new_graph(
...> graph_name,
...> "v2.0.0",
...> [input(:data), input(:new_field)]
...> )
iex> Journey.start_execution(graph_v1) |> Journey.set(:data, "v1 data")
iex> Journey.start_execution(graph_v2) |> Journey.set(:data, "v2 data")
iex> Journey.list_executions(graph_name: graph_v1.name, graph_version: "v1.0.0") |> length()
1
iex> Journey.list_executions(graph_name: graph_v1.name, graph_version: "v2.0.0") |> length()
1
iex> Journey.list_executions(graph_name: graph_v1.name) |> length()
2Validation that graph_version requires graph_name:
iex> Journey.list_executions(graph_version: "v1.0.0")
** (ArgumentError) Option :graph_version requires :graph_name to be specifiedSorting by execution fields and node values:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "sort example - #{Journey.Helpers.Random.random_string_w_time()}",
...> "v1.0.0",
...> [input(:priority)]
...> )
iex> Journey.start_execution(graph) |> Journey.set(:priority, "high")
iex> Journey.start_execution(graph) |> Journey.set(:priority, "low")
iex> Journey.start_execution(graph) |> Journey.set(:priority, "medium")
iex> # Sort by priority descending - shows the actual sorted values
iex> Journey.list_executions(graph_name: graph.name, sort_by: [priority: :desc]) |> Enum.map(fn e -> Journey.values(e) |> Map.get(:priority) end)
["medium", "low", "high"]Filtering with multiple operators:
iex> graph = Journey.Examples.Horoscope.graph()
iex> for day <- 1..20, do: Journey.start_execution(graph) |> Journey.set(:birth_day, day) |> Journey.set(:birth_month, 4) |> Journey.set(:first_name, "Mario")
iex> # Various filtering examples
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:birth_day, :eq, 10}]) |> Enum.count()
1
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:birth_day, :neq, 10}]) |> Enum.count()
19
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:birth_day, :lte, 5}]) |> Enum.count()
5
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:birth_day, :in, [5, 10, 15]}]) |> Enum.count()
3
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:first_name, :is_not_nil}]) |> Enum.count()
20
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:first_name, :contains, "ari"}]) |> Enum.count()
20
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:first_name, :icontains, "MARIO"}]) |> Enum.count()
20List containment filtering with :list_contains:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "notification example - #{Journey.Helpers.Random.random_string_w_time()}",
...> "v1.0.0",
...> [input(:recipients)]
...> )
iex> Journey.start_execution(graph) |> Journey.set(:recipients, ["user1", "user2", "admin"])
iex> Journey.start_execution(graph) |> Journey.set(:recipients, ["user3", "user4"])
iex> Journey.start_execution(graph) |> Journey.set(:recipients, [1, 2, 3])
iex> # Find executions where recipients list contains "user1"
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:recipients, :list_contains, "user1"}]) |> Enum.count()
1
iex> # Find executions where recipients list contains integer 2
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:recipients, :list_contains, 2}]) |> Enum.count()
1Multiple filters, sorting, and pagination:
iex> graph = Journey.Examples.Horoscope.graph()
iex> for day <- 1..20, do: Journey.start_execution(graph) |> Journey.set(:birth_day, day) |> Journey.set(:birth_month, 4) |> Journey.set(:first_name, "Mario")
iex> # Multiple filters combined
iex> Journey.list_executions(
...> graph_name: graph.name,
...> filter_by: [{:birth_day, :gt, 10}, {:first_name, :is_not_nil}],
...> sort_by: [birth_day: :desc],
...> limit: 5
...> ) |> Enum.count()
5
iex> # Pagination
iex> Journey.list_executions(graph_name: graph.name, limit: 3) |> Enum.count()
3
iex> Journey.list_executions(graph_name: graph.name, limit: 5, offset: 10) |> Enum.count()
5Including archived executions:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "list example - archived - #{Journey.Helpers.Random.random_string_w_time()}",
...> "v1.0.0",
...> [input(:status)]
...> )
iex> e1 = Journey.start_execution(graph)
iex> _e2 = Journey.start_execution(graph)
iex> Journey.archive(e1)
iex> Journey.list_executions(graph_name: graph.name) |> length()
1
iex> Journey.list_executions(graph_name: graph.name, include_archived: true) |> length()
2
Reloads the current state of an execution from the database to get the latest changes.
Executions can be modified by their background computations, or scheduled events, or other processes setting their values. This function is used to get the latest state of an execution -- as part of normal operations, or when the system starts up, or when the user whose session is being tracked as an execution comes back to the web site and resumes their flow.
Quick Example
execution = Journey.set(execution, :name, "Mario")
execution = Journey.load(execution) # Get updated state with new revision
{:ok, greeting, _} = Journey.get(execution, :greeting, wait: :any)Use set/3 and get_value/3 to modify and read execution values.
Parameters
execution- A%Journey.Persistence.Schema.Execution{}struct or execution ID stringopts- Keyword list of options (see Options section below)
Returns
- A
%Journey.Persistence.Schema.Execution{}struct with current database state, ornilif not found
Options
:preload- Whether to preload associated nodes and values. Defaults totrue. Set tofalsefor better performance when you only need execution metadata.:include_archived- Whether to include archived executions. Defaults tofalse. Archived executions are normally hidden but can be loaded with this option.
Key Behaviors
- Fresh state - Always returns the current state from the database, not cached data
- Revision tracking - Loaded execution will have the latest revision number
- Archived handling - Archived executions return
nilunless explicitly included - Performance option - Use
preload: falseto skip loading values/computations for speed
Examples
Basic reloading after value changes:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "load example - basic",
...> "v1.0.0",
...> [
...> input(:name),
...> compute(:greeting, [:name], fn %{name: name} -> {:ok, "Hello, #{name}!"} end)
...> ]
...> )
iex> execution = Journey.start_execution(graph)
iex> execution.revision
0
iex> execution = Journey.set(execution, :name, "Alice")
iex> execution.revision > 0
true
iex> {:ok, "Hello, Alice!", _} = Journey.get(execution, :greeting, wait: :any)
iex> reloaded = Journey.load(execution)
iex> reloaded.revision >= execution.revision
trueLoading by execution ID:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "load example - by id",
...> "v1.0.0",
...> [input(:data)]
...> )
iex> execution = Journey.start_execution(graph)
iex> execution_id = execution.id
iex> reloaded = Journey.load(execution_id)
iex> reloaded.id == execution_id
truePerformance optimization with preload option:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "load example - no preload",
...> "v1.0.0",
...> [input(:data)]
...> )
iex> execution = Journey.start_execution(graph)
iex> fast_load = Journey.load(execution, preload: false)
iex> fast_load.id == execution.id
trueHandling archived executions:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "load example - archived",
...> "v1.0.0",
...> [input(:data)]
...> )
iex> execution = Journey.start_execution(graph)
iex> Journey.archive(execution)
iex> Journey.load(execution)
nil
iex> Journey.load(execution, include_archived: true) != nil
true
Starts a new execution instance of a computation graph, initializing it to accept input values and perform computations.
Creates a persistent execution in the database with a unique ID and begins background processing for any schedulable nodes. The execution starts with revision 0 and no values set.
Quick Example
execution = Journey.start_execution(graph)
execution = Journey.set(execution, :name, "Mario")
{:ok, greeting, _} = Journey.get(execution, :greeting, wait: :any)Use set/3 to provide input values and get_value/3 to retrieve computed results.
Parameters
graph- A validated%Journey.Graph{}struct created withnew_graph/3. The graph must have passed validation during creation and be registered in the graph catalog.
Returns
- A new
%Journey.Persistence.Schema.Execution{}struct with::id- Unique execution identifier (UUID string):graph_nameand:graph_version- From the source graph:revision- Always starts at 0, increments with each state change:archived_at- Initially nil (not archived) and other fields.
Key Behaviors
- Database persistence - Execution state is immediately saved to PostgreSQL
- Unique execution - Each call creates a completely independent execution instance
- Background processing - Scheduler automatically begins monitoring for schedulable nodes
- Ready for inputs - Can immediately accept input values via
set/3
Examples
Basic execution creation:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "greeting workflow",
...> "v1.0.0",
...> [
...> input(:name),
...> compute(
...> :greeting,
...> [:name],
...> fn %{name: name} -> {:ok, "Hello, #{name}!"} end
...> )
...> ]
...> )
iex> execution = Journey.start_execution(graph)
iex> execution.graph_name
"greeting workflow"
iex> execution.graph_version
"v1.0.0"
iex> execution.revision
0Execution properties and immediate workflow:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "calculation workflow",
...> "v1.0.0",
...> [
...> input(:x),
...> input(:y),
...> compute(:sum, [:x, :y], fn %{x: x, y: y} -> {:ok, x + y} end)
...> ]
...> )
iex> execution = Journey.start_execution(graph)
iex> is_binary(execution.id)
true
iex> execution.archived_at
nil
iex> user_values = Journey.values(execution, reload: false) |> Map.drop([:execution_id, :last_updated_at])
iex> user_values
%{}
iex> execution = Journey.set(execution, :x, 10)
iex> execution = Journey.set(execution, :y, 20)
iex> {:ok, 30, _revision} = Journey.get(execution, :sum, wait: :any)Multiple independent executions:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "counter workflow",
...> "v1.0.0",
...> [input(:count)]
...> )
iex> execution1 = Journey.start_execution(graph)
iex> execution2 = Journey.start_execution(graph)
iex> execution1.id != execution2.id
true
iex> execution1 = Journey.set(execution1, :count, 1)
iex> execution2 = Journey.set(execution2, :count, 2)
iex> {:ok, 1, 1} = Journey.get(execution1, :count)
iex> {:ok, 2, 1} = Journey.get(execution2, :count)
Un-archives the supplied execution, if it is archived.
Parameters:
executionorexecution_id: The execution to un-archive, or the ID of the execution to un-archive.
Returns
- :ok
Examples
iex> execution =
...> Journey.Examples.Horoscope.graph() |>
...> Journey.start_execution() |>
...> Journey.set(:birth_day, 26)
iex> _archived_at = Journey.archive(execution)
iex> # The execution is now archived, and it is no longer visible.
iex> nil == Journey.load(execution, include_archived: false)
true
iex> Journey.unarchive(execution)
:ok
iex> # The execution is now un-archived, and it can now be loaded.
iex> nil == Journey.load(execution, include_archived: false)
false
iex> # Un-archiving an un-archived execution has no effect.
iex> Journey.unarchive(execution)
:ok
Value Operations
Returns the value and revision of a node in an execution. Optionally waits for the value to be set.
This function atomically returns both the node value and its revision number, eliminating race conditions when you need to track which revision of a value you received.
Quick Examples
# Basic usage - get a set value and its revision immediately
{:ok, value, revision} = Journey.get(execution, :name)
# Wait for a computed value to be available (30 second default timeout)
{:ok, result, revision} = Journey.get(execution, :computed_field, wait: :any)
# Wait for a new version of the value with custom timeout
{:ok, new_value, new_revision} = Journey.get(execution, :name, wait: :newer, timeout: 5000)
# Wait for a value newer than a specific revision
{:ok, fresh_value, fresh_revision} = Journey.get(execution, :name, wait: {:newer_than, 10})Use set/3 to set input values that trigger computations.
Parameters
execution- A%Journey.Persistence.Schema.Execution{}structnode_name- Atom representing the node name (must exist in the graph)opts- Keyword list of options (see Options section below)
Returns
{:ok, value, revision}– the value is set, with its revision number{:error, :not_set}– the value is not yet set{:error, :computation_failed}– the computation permanently failed
Errors
- Raises
RuntimeErrorif the node name does not exist in the execution's graph - Raises
ArgumentErrorif an invalid:waitoption is provided
Options
:wait– Controls waiting behavior::immediate(default) – Return immediately without waiting:any– Wait until the value is available or timeout:newer– Wait for a newer revision than current execution{:newer_than, revision}– Wait for value newer than specific revision
:timeout– Timeout in milliseconds (default: 30,000) or:infinity
Examples
iex> execution =
...> Journey.Examples.Horoscope.graph() |>
...> Journey.start_execution() |>
...> Journey.set(:birth_day, 26)
iex> {:ok, 26, _revision} = Journey.get(execution, :birth_day)
iex> Journey.get(execution, :birth_month)
{:error, :not_set}
iex> Journey.get(execution, :astrological_sign)
{:error, :not_set}
iex> execution = Journey.set(execution, :birth_month, "April")
iex> Journey.get(execution, :astrological_sign)
{:error, :not_set}
iex> {:ok, "Taurus", _revision} = Journey.get(execution, :astrological_sign, wait: :any)
iex> Journey.get(execution, :horoscope, wait: :any, timeout: 2_000)
{:error, :not_set}
iex> execution = Journey.set(execution, :first_name, "Mario")
iex> {:ok, "🍪s await, Taurus Mario!", _revision} = Journey.get(execution, :horoscope, wait: :any)
Sets values for input nodes in an execution and triggers recomputation of dependent nodes.
This function supports three calling patterns:
- Single value:
set(execution, :node_name, value) - Multiple values via map:
set(execution, %{node1: value1, node2: value2}) - Multiple values via keyword list:
set(execution, node1: value1, node2: value2)
When values are set, Journey automatically recomputes any dependent computed nodes to ensure consistency across the dependency graph. The operation is idempotent - setting the same values has no effect.
Parameters
Single value:
execution- A%Journey.Persistence.Schema.Execution{}struct or execution ID stringnode_name- Atom representing the input node name (must exist in the graph)value- The value to set. Supported types: nil, string, number, map, list, boolean. Note that if the map or the list contains atoms, those atoms will be converted to strings.
Multiple values:
execution- A%Journey.Persistence.Schema.Execution{}struct or execution ID stringvalues- Map of node names to values (e.g.,%{node1: "value1", node2: 42}) or keyword list (e.g.,[node1: "value1", node2: 42])
Options
metadata:- Optional contextual information to attach to the value(s). Accepts any JSON-compatible type:nil, string, number, boolean, list, or map. If using a map, keys must be strings (not atoms) for JSONB storage. Useful for audit trails, tracking authors, timestamps, IP addresses, or other provenance data. For bulk operations (map/keyword list), the same metadata applies to all values. Metadata is stored with the value and flows to historians and compute functions but is not exposed viaJourney.get(). Default:nil
Returns
- Updated
%Journey.Persistence.Schema.Execution{}struct with incremented revision (if any value changed)
Errors
- Raises
RuntimeErrorif any node name does not exist in the execution's graph - Raises
RuntimeErrorif attempting to set compute nodes (only input nodes can be set)
Key Behaviors
- Automatic recomputation - Setting values triggers recomputation of all dependent nodes
- Idempotent - Setting the same values has no effect (no revision increment)
- Input nodes only - Only input nodes can be set; compute nodes are read-only
- Atomic updates - Multiple values are set together in a single transaction (single revision increment)
- Metadata tracking - Optional metadata flows to historians and arity-2 compute functions for audit trails
Quick Examples
# Single value
execution = Journey.set(execution, :name, "Mario")
# Multiple values via map
execution = Journey.set(execution, %{name: "Mario", age: 35})
# Multiple values via keyword list
execution = Journey.set(execution, name: "Mario", age: 35)
{:ok, greeting, _} = Journey.get(execution, :greeting, wait: :any)Use get_value/3 to retrieve values and unset/2 to remove values.
Examples
Basic setting with cascading recomputation:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "set workflow - cascading example",
...> "v1.0.0",
...> [
...> input(:name),
...> compute(:greeting, [:name], fn %{name: name} -> {:ok, "Hello, #{name}!"} end)
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, :name, "Mario")
iex> {:ok, "Hello, Mario!", 3} = Journey.get(execution, :greeting, wait: :any)
iex> execution = Journey.set(execution, :name, "Luigi")
iex> {:ok, "Hello, Luigi!", _revision} = Journey.get(execution, :greeting, wait: :newer)Idempotent behavior - same value doesn't change revision:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "set workflow - idempotent example",
...> "v1.0.0",
...> [input(:name)]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, :name, "Mario")
iex> first_revision = execution.revision
iex> execution = Journey.set(execution, :name, "Mario")
iex> execution.revision == first_revision
trueDifferent value types:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "set workflow - value types example",
...> "v1.0.0",
...> [input(:number), input(:flag), input(:data)]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, :number, 42)
iex> execution = Journey.set(execution, :flag, true)
iex> execution = Journey.set(execution, :data, %{"key" => "value"})
iex> {:ok, 42, _revision} = Journey.get(execution, :number)
iex> {:ok, true, _revision} = Journey.get(execution, :flag)Using an execution ID:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "set workflow - execution_id example",
...> "v1.0.0",
...> [input(:name)]
...> )
iex> execution = graph |> Journey.start_execution()
iex> updated_execution = Journey.set(execution.id, :name, "Luigi")
iex> {:ok, "Luigi", _revision} = Journey.get(updated_execution, :name)Multiple values via map (atomic operation):
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "set workflow - multiple map example",
...> "v1.0.0",
...> [
...> input(:first_name),
...> input(:last_name),
...> compute(:full_name, [:first_name, :last_name], fn %{first_name: first, last_name: last} ->
...> {:ok, "#{first} #{last}"}
...> end)
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, %{first_name: "Mario", last_name: "Bros"})
iex> {:ok, "Mario", 1} = Journey.get(execution, :first_name)
iex> {:ok, "Bros", 1} = Journey.get(execution, :last_name)
iex> {:ok, "Mario Bros", 3} = Journey.get(execution, :full_name, wait: :any)Multiple values via keyword list (ergonomic syntax):
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "set workflow - keyword example",
...> "v1.0.0",
...> [input(:name), input(:age), input(:active)]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, name: "Mario", age: 35, active: true)
iex> {:ok, "Mario", 1} = Journey.get(execution, :name)
iex> {:ok, 35, 1} = Journey.get(execution, :age)
iex> {:ok, true, 1} = Journey.get(execution, :active)Setting values with metadata for audit trails:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "audit trail example",
...> "v1.0.0",
...> [
...> input(:document_title),
...> historian(:title_history, [:document_title])
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, :document_title, "Draft v1", metadata: %{"author_id" => "user123"})
iex> {:ok, history1, _} = Journey.get(execution, :title_history, wait: :any)
iex> length(history1)
1
iex> execution = Journey.set(execution, :document_title, "Draft v2", metadata: %{"author_id" => "user456"})
iex> {:ok, history2, _} = Journey.get(execution, :title_history, wait: :newer)
iex> length(history2)
2
iex> # History entries include metadata for audit trail (newest first)
iex> [%{"value" => "Draft v2", "metadata" => %{"author_id" => "user456"}}, %{"value" => "Draft v1", "metadata" => %{"author_id" => "user123"}}] = history2Metadata with different types:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "metadata types example",
...> "v1.0.0",
...> [input(:field)]
...> )
iex> execution = graph |> Journey.start_execution()
iex> # String metadata
iex> execution = Journey.set(execution, :field, "value1", metadata: "version-1")
iex> # Map metadata (keys must be strings, not atoms)
iex> execution = Journey.set(execution, :field, "value2", metadata: %{"author_id" => "user789", "ip" => "192.168.1.1"})
iex> # Number metadata
iex> execution = Journey.set(execution, :field, "value3", metadata: 42)
iex> # List metadata
iex> execution = Journey.set(execution, :field, "value4", metadata: ["tag1", "tag2"])
iex> {:ok, "value4", _} = Journey.get(execution, :field)
Removes values from input nodes in an execution and invalidates all dependent computed nodes.
This function supports two calling patterns:
- Single value:
unset(execution, :node_name) - Multiple values via list:
unset(execution, [:node1, :node2, :node3])
When values are unset, Journey automatically invalidates (unsets) all computed nodes that depend on the unset inputs, creating a cascading effect through the dependency graph. This ensures data consistency - no computed values remain that were based on the now-removed inputs.
Quick Examples
# Single value
execution = Journey.unset(execution, :name)
{:error, :not_set} = Journey.get(execution, :name)
# Multiple values
execution = Journey.unset(execution, [:first_name, :last_name, :email])Use set/3 to set values and get_value/3 to check if values are set.
Parameters
Single value:
execution- A%Journey.Persistence.Schema.Execution{}struct or execution ID stringnode_name- Atom representing the input node name (must exist in the graph)
Multiple values:
execution- A%Journey.Persistence.Schema.Execution{}struct or execution ID stringnode_names- List of atoms representing input node names (all must exist in the graph)
Returns
- Updated
%Journey.Persistence.Schema.Execution{}struct with incremented revision (if value was set)
Errors
- Raises
RuntimeErrorif the node name does not exist in the execution's graph - Raises
RuntimeErrorif attempting to unset a compute node (only input nodes can be unset)
Key Behaviors
- Cascading invalidation - Dependent computed nodes are automatically unset
- Idempotent - Multiple unsets of the same value have no additional effect
- Input nodes only - Only input nodes can be unset; compute nodes cannot be unset
- Atomic updates - Multiple values are unset together in a single transaction (single revision increment)
Examples
Basic unsetting with cascading invalidation:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "unset workflow - basic example",
...> "v1.0.0",
...> [
...> input(:name),
...> compute(
...> :greeting,
...> [:name],
...> fn %{name: name} -> {:ok, "Hello, #{name}!"} end
...> )
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, :name, "Mario")
iex> {:ok, "Hello, Mario!", 3} = Journey.get(execution, :greeting, wait: :any)
iex> execution_after_unset = Journey.unset(execution, :name)
iex> Journey.get(execution_after_unset, :name)
{:error, :not_set}
iex> Journey.get(execution_after_unset, :greeting)
{:error, :not_set}Multi-level cascading (A → B → C chain):
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "unset workflow - cascade example",
...> "v1.0.0",
...> [
...> input(:a),
...> compute(:b, [:a], fn %{a: a} -> {:ok, "B:#{a}"} end),
...> compute(:c, [:b], fn %{b: b} -> {:ok, "C:#{b}"} end)
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, :a, "value")
iex> {:ok, "B:value", 3} = Journey.get(execution, :b, wait: :any)
iex> {:ok, "C:B:value", 5} = Journey.get(execution, :c, wait: :any)
iex> execution_after_unset = Journey.unset(execution, :a)
iex> Journey.get(execution_after_unset, :a)
{:error, :not_set}
iex> Journey.get(execution_after_unset, :b)
{:error, :not_set}
iex> Journey.get(execution_after_unset, :c)
{:error, :not_set}Idempotent behavior:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "unset workflow - idempotent example",
...> "v1.0.0",
...> [input(:name)]
...> )
iex> execution = graph |> Journey.start_execution()
iex> original_revision = execution.revision
iex> execution_after_unset = Journey.unset(execution, :name)
iex> execution_after_unset.revision == original_revision
trueMultiple values atomic operation:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "unset workflow - multiple values example",
...> "v1.0.0",
...> [
...> input(:first_name),
...> input(:last_name),
...> input(:email),
...> compute(:full_name, [:first_name, :last_name], fn %{first_name: first_name, last_name: last_name} ->
...> {:ok, "#{first_name} #{last_name}"}
...> end)
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, %{first_name: "Mario", last_name: "Bros", email: "mario@example.com"})
iex> {:ok, "Mario Bros", 3} = Journey.get(execution, :full_name, wait: :any)
iex> execution_after_unset = Journey.unset(execution, [:first_name, :last_name])
iex> Journey.get(execution_after_unset, :first_name)
{:error, :not_set}
iex> Journey.get(execution_after_unset, :last_name)
{:error, :not_set}
iex> {:ok, "mario@example.com", 1} = Journey.get(execution_after_unset, :email)
iex> Journey.get(execution_after_unset, :full_name)
{:error, :not_set}
Data Retrieval
Returns a map of node values in an execution.
By default, only returns nodes that have been set, excluding unset nodes.
With include_unset_as_nil: true, returns all nodes with unset ones as nil.
Always includes :execution_id and :last_updated_at metadata.
Quick Example
execution = Journey.set(execution, :name, "Alice")
values = Journey.values(execution)
# %{name: "Alice", execution_id: "EXEC...", last_updated_at: 1234567890}
# Include unset nodes as nil
all_values = Journey.values(execution, include_unset_as_nil: true)
# %{name: "Alice", age: nil, execution_id: "EXEC...", last_updated_at: 1234567890}Use values_all/1 to see all nodes with their status tuples, or get_value/3 for individual values.
Parameters
execution- A%Journey.Persistence.Schema.Execution{}structopts- Keyword list of options::reload- Reload execution from database (default:true):include_unset_as_nil- Include unset nodes asnilvalues (default:false)
Returns
- Map with node names as keys and their current values as values
- When
include_unset_as_nil: false(default): Only includes set nodes - When
include_unset_as_nil: true: Includes all nodes, with unset ones asnil
Examples
Basic usage:
iex> import Journey.Node
iex> graph = Journey.new_graph("example", "v1.0.0", [input(:name), input(:age)])
iex> execution = Journey.start_execution(graph)
iex> Journey.values(execution) |> redact([:execution_id, :last_updated_at])
%{execution_id: "...", last_updated_at: 1234567890}
iex> execution = Journey.set(execution, :name, "Alice")
iex> Journey.values(execution) |> redact([:execution_id, :last_updated_at])
%{name: "Alice", execution_id: "...", last_updated_at: 1234567890}Including unset nodes:
iex> import Journey.Node
iex> graph = Journey.new_graph("example", "v1.0.0", [input(:name), input(:age)])
iex> execution = Journey.start_execution(graph)
iex> execution = Journey.set(execution, :name, "Alice")
iex> Journey.values(execution, include_unset_as_nil: true) |> redact([:execution_id, :last_updated_at])
%{name: "Alice", age: nil, execution_id: "...", last_updated_at: 1234567890}
Returns a map of all nodes in an execution with their current status, including unset nodes.
Unlike values/2 which only returns set nodes, this function shows all nodes including those
that haven't been set yet. Unset nodes are marked as :not_set, while set nodes are returned
as {:set, value} tuples. Useful for debugging and introspection.
Quick Example
all_status = Journey.values_all(execution)
# %{name: {:set, "Alice"}, age: :not_set, execution_id: {:set, "EXEC..."}, ...}Use values/2 to get only set values, or get_value/3 for individual node values.
Parameters
execution- A%Journey.Persistence.Schema.Execution{}structopts- Keyword list of options (:reload- defaults totruefor fresh database state)
Returns
- Map with all nodes showing status:
:not_setor{:set, value} - Includes all nodes defined in the graph, regardless of current state
Examples
Basic usage showing status progression:
iex> import Journey.Node
iex> graph = Journey.new_graph("example", "v1.0.0", [input(:name), input(:age)])
iex> execution = Journey.start_execution(graph)
iex> Journey.values_all(execution) |> redact([:execution_id, :last_updated_at])
%{name: :not_set, age: :not_set, execution_id: {:set, "..."}, last_updated_at: {:set, 1234567890}}
iex> execution = Journey.set(execution, :name, "Alice")
iex> Journey.values_all(execution) |> redact([:execution_id, :last_updated_at])
%{name: {:set, "Alice"}, age: :not_set, execution_id: {:set, "..."}, last_updated_at: {:set, 1234567890}}