Journey (Journey v0.10.30)
Journey is an Elixir library for building and executing computation graphs, with built-in persistence, reliability, and scalability.
Define your application workflows as dependency graphs where user inputs automatically trigger computations in the correct order, with all state persisted to PostgreSQL.
Executions of the graph survive crashes, redeploys, page reloads, while scaling naturally with your application - no additional infrastructure or cloud service$ required.
Your application can perform durable, short or long-running executions, with retries, scalability, dependency tracking, scheduling and analytics.
Journey's primitives are simple: graph, dependencies, functions, persistence, retries, scheduling. Together, they help you build rich, scalable, reliable functionality with simple, well-structured and easy-to-understand code, quickly.
Overview
To illustrate a few concepts (graph, dependencies β including conditional dependencies, computation functions, persistence), here is a simple example.
This graph adds two numbers when they become available, and conditionally sets the "too large" flag.
iex> import Journey.Node
iex> # Defining a graph, with two input nodes and two downstream computations.
iex> graph = Journey.new_graph(
...> "demo graph",
...> "v1",
...> [
...> input(:x),
...> input(:y),
...> # :sum is unblocked when :x and :y are provided.
...> compute(:sum, [:x, :y], fn %{x: x, y: y} -> {:ok, x + y} end),
...> # :large_value_alert is unblocked when :sum is provided and is greater than 40.
...> compute(
...> :large_value_alert,
...> [sum: fn sum_node -> sum_node.node_value > 40 end],
...> fn %{sum: sum} -> {:ok, "π¨, at #{sum}"} end,
...> f_on_save: fn _execution_id, _result ->
...> # (e.g. send a pubsub notification to the LiveView process to update the UI)
...> :ok
...> end
...> )
...> ]
...> )
iex> # Start an execution of this graph, set input values, read computed values.
iex> execution = Journey.start_execution(graph)
iex> execution = Journey.set_value(execution, :x, 12)
iex> execution = Journey.set_value(execution, :y, 2)
iex> Journey.get_value(execution, :sum, wait_any: true)
{:ok, 14}
iex> Journey.get_value(execution, :large_value_alert)
{:error, :not_set}
iex> eid = execution.id
iex> # After an outage / redeployment / page reload / long pause, an execution
iex> # can be reloaded and continue, as if nothing happened.
iex> execution = Journey.load(eid)
iex> # An update to :y triggers a re-computation of downstream values.
iex> execution = Journey.set_value(execution, :y, 37)
iex> Journey.get_value(execution, :large_value_alert, wait_any: true)
{:ok, "π¨, at 49"}
iex> Journey.values(execution) |> redact([:execution_id, :last_updated_at])
%{execution_id: "...", last_updated_at: 1234567890, sum: 49, x: 12, y: 37, large_value_alert: "π¨, at 49"}
The graph can be visualized as a Mermaid graph:
> Journey.Tools.generate_mermaid_graph(graph)
graph TD
%% Graph
subgraph Graph["π§© 'demo graph', version v1"]
execution_id[execution_id]
last_updated_at[last_updated_at]
x[x]
y[y]
sum["sum<br/>(anonymous fn)"]
large_value_alert["large_value_alert<br/>(anonymous fn)"]
x --> sum
y --> sum
sum --> large_value_alert
end
%% Styling
classDef inputNode fill:#e1f5fe,stroke:#01579b,stroke-width:2px,color:#000000
classDef computeNode fill:#f3e5f5,stroke:#4a148c,stroke-width:2px,color:#000000
classDef scheduleNode fill:#fff3e0,stroke:#e65100,stroke-width:2px,color:#000000
classDef mutateNode fill:#e8f5e8,stroke:#2e7d32,stroke-width:2px,color:#000000
%% Apply styles to actual nodes
class y,x,last_updated_at,execution_id inputNode
class large_value_alert,sum computeNode
A few things to note about this example:
- Every input value (
:x
,:y
), or computation result (:sum
,:large_value_alert
) is persisted as soon as it becomes available, - The functions attached to
:sum
and:large_value_alert
- are called reliably, with a retry policy,
- will execute on any of the replicas of your application,
- are called proactively βΒ when their upstream dependencies are available.
- Executions of this flow can take as long as needed (milliseconds? months?), and will live through system restarts, crashes, redeployments, page reloads, etc.
So What Exactly Does Journey Provide?
Despite the simplicity of use, here are a few things provided by Journey that are worth noting:
Persistence: Executions are persisted, so if the customer leaves the web site, or if the system crashes, their execution can be reloaded and continued from where it left off.
Scaling: Since Journey runs as part of your application, it scales with your application. Your graph's computations (
:sum
's function in the example above, or&compute_zodiac_sign/1
and&compute_horoscope/1
in the example above) run on the same nodes where the replicas of your application are running. No additional infrastructure or cloud services are needed.Reliability: Journey uses database-based supervision of computation tasks: The
compute
functions are subject to customizable retry policy, so if:sum
's function above or&compute_horoscope/1
below fails because of a temporary glitch (e.g. the LLM service it uses for drafting horoscopes is currently overloaded), it will be retried.Code Structure: The flow of your application is captured in the Journey graph, and the business logic is captured in the compute functions (
:sum
's function above, or&compute_zodiac_sign/1
and&compute_horoscope/1
below). This clean separation supports you in structuring the functionality of your application in a clear, easy to understand and maintain way.Conditional flow: Journey allows you to define conditions for when a node is to be unblocked. So if your graph includes a "credit_approval_decision" node, the decision can inform which part of the graph is to be executed next (sending a "congrats!" email and starting the credit card issuance process, or sending a "sad trombone" email).
Graph Visualization: Journey provides tools for visualizing your application's graph, so you can easily see the flow of data and computations in your application, and to share and discuss it with your team.
Scheduling: Your graph can include computations that are scheduled to run at a later time, or on a recurring basis. Daily horoscope emails! A reminder email if they haven't visited the web site in a while! A "happy birthday" email!
Removing PII. Journey gives you an easy way to erase sensitive data once it is no longer needed. For example, your Credit Card Application graph can include a step to remove the SSN once the credit score has been computed. For an example, please see
mutate(:ssn_redacted, [:credit_score], fn _ -> {:ok, "<redacted>"} end, mutates: :ssn)
node in the example credit card application graph, here, which mutates the contents of the :ssn node, replacing its value with "<redacted>", when :credit_score completes.
Tooling and visualization:
Journey.Tools
provides a set of tools for introspecting and managing executions, and for visualizing your application's graph.
A (slightly) richer example: computing horoscopes
Consider a simple Horoscope application that computes a customer's zodiac sign and horoscope based on their birthday. The application will ask the customer to input
their name and birthday, and it then auto-compute
s their zodiac sign and horoscope.
This application can be thought of as a graph of nodes, where each node represents a piece of customer-provided data or the result of a computation. Add functions for computing the zodiac sign and horoscope, and capture the sequencing of the computations, and you have a graph that captures the flow of data and computations in your application. When a customer visits your application, you can start the execution of the graph, to accept and store customer-provided inputs (name, birthday), and to compute the zodiac sign and horoscope based on these inputs.
Journey provides a way to define such graphs, and to run their executions, to serve your customer flows.
Step-by-Step
Below is a step-by-step example of defining a Journey graph for this Horoscope application.
(These are code snippets, if you want a complete fragment you can paste into iex
or livebook, scroll down to the "Putting together" code block.)
This graph captures customer input
s, and defines compute
ations (together with their functions and prerequisites):
graph = Journey.new_graph(
"horoscope workflow - module doctest",
"v1.0.0",
[
input(:first_name),
input(:birth_day),
input(:birth_month),
compute(
:zodiac_sign,
[:birth_month, :birth_day],
&compute_zodiac_sign/1
),
compute(
:horoscope,
[:first_name, :zodiac_sign],
&compute_horoscope/1
)
]
)
When a customer lands on your web page, and starts a new flow, your application will start a new execution of the graph,
execution = Journey.start_execution(graph)
and it will populate the execution with the input values (name, birthday) as the customer provides them:
execution = Journey.set_value(execution, :first_name, "Mario")
execution = Journey.set_value(execution, :birth_day, 5)
execution = Journey.set_value(execution, :birth_month, "May")
Providing these input values will trigger automatic computations of the customer's zodiac_sign and the horoscope, which can then be read from the execution and rendered on the web page.
{:ok, zodiac_sign} = Journey.get_value(execution, :zodiac_sign, wait_any: true)
{:ok, horoscope} = Journey.get_value(execution, :horoscope, wait_any: true)
And that's it!
Example
Putting together the components of the horoscope example into a complete, running doctest example:
iex> # 1. Define a graph capturing the data and the logic of the application -
iex> # the nodes, their dependencies, and their computations:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "horoscope workflow - module doctest",
...> "v1.0.0",
...> [
...> input(:first_name),
...> input(:birth_day),
...> input(:birth_month),
...> compute(
...> :zodiac_sign,
...> # Depends on user-supplied data:
...> [:birth_month, :birth_day],
...> # Computes itself, once the dependencies are satisfied:
...> fn %{birth_month: _birth_month, birth_day: _birth_day} ->
...> {:ok, "Taurus"}
...> end
...> ),
...> compute(
...> :horoscope,
...> # Computes itself once :first_name and :zodiac_sign are in place:
...> [:first_name, :zodiac_sign],
...> fn %{first_name: name, zodiac_sign: zodiac_sign} ->
...> {:ok, "πͺs await, #{zodiac_sign} #{name}!"}
...> end
...> )
...> ]
...> )
iex>
iex> # 2. For every customer visiting your website, start a new execution of the graph:
iex> e = Journey.start_execution(graph)
iex>
iex> # 3. Populate the execution's nodes with the data as provided by the visitor:
iex> e = Journey.set_value(e, :birth_day, 26)
iex>
iex> # As a side note: if the user leaves and comes back later or if everything crashes,
iex> # you can always reload the execution using its id:
iex> e = Journey.load(e.id)
iex>
iex> # Continuing, as if nothing happened:
iex> e = Journey.set_value(e, :birth_month, "April")
iex>
iex> # 4. Now that we have :birth_month and :birth_day, :zodiac_sign will compute itself:
iex> Journey.get_value(e, :zodiac_sign, wait_any: true)
{:ok, "Taurus"}
iex> Journey.values(e) |> redact([:execution_id, :last_updated_at])
%{birth_day: 26, birth_month: "April", zodiac_sign: "Taurus", execution_id: "...", last_updated_at: 1234567890}
iex>
iex> # 5. Once we get :first_name, the :horoscope node will compute itself:
iex> e = Journey.set_value(e, :first_name, "Mario")
iex> Journey.get_value(e, :horoscope, wait_any: true)
{:ok, "πͺs await, Taurus Mario!"}
iex>
iex> Journey.values(e) |> redact([:execution_id, :last_updated_at])
%{birth_day: 26, birth_month: "April", first_name: "Mario", horoscope: "πͺs await, Taurus Mario!", zodiac_sign: "Taurus", execution_id: "...", last_updated_at: 1234567890}
iex>
iex> # 6. and we can always list executions.
iex> this_execution = Journey.list_executions(graph_name: "horoscope workflow - module doctest", order_by_execution_fields: [:inserted_at]) |> Enum.reverse() |> hd
iex> e.id == this_execution.id
true
For a more in-depth example of building a more complex application, see the Credit Card Application example in Journey.Examples.CreditCardApplication
.
Summary
Functions
Archives an execution, making it invisible and stopping all background processing.
Returns the value of a node in an execution. Optionally waits for the value to be set.
Returns the chronological history of all successful computations and set values for an execution.
Queries and retrieves multiple executions from the database with flexible filtering, sorting, and pagination.
Reloads the current state of an execution from the database to get the latest changes.
Creates a new computation graph with the given name, version, and node definitions.
Sets the value for an input node in an execution and triggers recomputation of dependent nodes.
Starts a new execution instance of a computation graph, initializing it to accept input values and perform computations.
Un-archives the supplied execution, if it is archived.
Removes the value from an input node in an execution and invalidates all dependent computed nodes.
Returns a map of all set node values in an execution, excluding unset nodes.
Returns a map of all nodes in an execution with their current status, including unset nodes.
Functions
Archives an execution, making it invisible and stopping all background processing.
Archiving permanently (*) freezes an execution by marking it with an archived timestamp. This removes it from normal visibility and excludes it from all scheduler processing, while preserving the data for potential future access.
*) an execution can be unarchived by calling unarchive/1
Quick Example
archived_at = Journey.archive(execution)
Journey.load(execution) # Returns nil (hidden)
Journey.load(execution, include_archived: true) # Can still access
Use unarchive/1
to reverse archiving and list_executions/1
with :include_archived
to find archived executions.
Parameters
execution
- A%Journey.Persistence.Schema.Execution{}
struct or execution ID string
Returns
- Integer timestamp (Unix epoch seconds) when the execution was archived
Key Behaviors
- Scheduler exclusion - Archived executions are excluded from all background sweeps and processing
- Hidden by default - Not returned by
list_executions/1
orload/2
unless explicitly included - Idempotent - Archiving an already archived execution returns the existing timestamp
- Reversible - Use
unarchive/1
to restore normal visibility and processing
Examples
Basic archiving workflow:
iex> import Journey.Node
iex> graph = Journey.new_graph("archive example", "v1.0.0", [input(:data)])
iex> execution = Journey.start_execution(graph)
iex> execution.archived_at
nil
iex> archived_at = Journey.archive(execution)
iex> is_integer(archived_at)
true
iex> Journey.load(execution)
nil
iex> Journey.load(execution, include_archived: true) != nil
true
Idempotent behavior:
iex> import Journey.Node
iex> graph = Journey.new_graph("archive idempotent", "v1.0.0", [input(:data)])
iex> execution = Journey.start_execution(graph)
iex> first_archive = Journey.archive(execution)
iex> second_archive = Journey.archive(execution)
iex> first_archive == second_archive
true
Returns the value of a node in an execution. Optionally waits for the value to be set.
Quick Examples
# Basic usage - get a set value
{:ok, value} = Journey.get_value(execution, :name)
# Wait for a computed value to be available
{:ok, result} = Journey.get_value(execution, :computed_field, wait_any: true)
# Wait for a new version of the value
{:ok, new_value} = Journey.get_value(execution, :name, wait_new: true)
Use set_value/3
to set input values that trigger computations.
Parameters
execution
- A%Journey.Persistence.Schema.Execution{}
structnode_name
- Atom representing the node name (must exist in the graph)opts
- Keyword list of options (see Options section below)
Returns
{:ok, value}
β the value is set{:error, :not_set}
β the value is not yet set{:error, :no_such_value}
β the node does not exist
Errors
- Raises
RuntimeError
if the node name does not exist in the execution's graph - Raises
ArgumentError
if both:wait_any
and:wait_new
options are provided (mutually exclusive)
Options
:wait_any
βΒ whether or not to wait for the value to be set. This option can have the following values:false
or0
β return immediately without waiting (default)true
β wait until the value is available, or until timeout- a positive integer β wait for the supplied number of milliseconds (default: 30_000)
:infinity
β wait indefinitely This is useful for self-computing nodes, where the value is computed asynchronously.
:wait_new
β whether to wait for a new revision of the value, compared to the version in the supplied execution. This option can have the following values:false
β do not wait for a new revision (default)true
β wait for a value with a higher revision than the current one, or the first value if none exists yet, or until timeout- a positive integer β wait for the supplied number of milliseconds for a new revision This is useful for when want a new version of the value, and are waiting for it to get computed.
Note: :wait_any
and :wait_new
are mutually exclusive.
Examples
iex> execution =
...> Journey.Examples.Horoscope.graph() |>
...> Journey.start_execution() |>
...> Journey.set_value(:birth_day, 26)
iex> Journey.get_value(execution, :birth_day)
{:ok, 26}
iex> Journey.get_value(execution, :birth_month)
{:error, :not_set}
iex> Journey.get_value(execution, :astrological_sign)
{:error, :not_set}
iex> execution = Journey.set_value(execution, :birth_month, "April")
iex> Journey.get_value(execution, :astrological_sign)
{:error, :not_set}
iex> Journey.get_value(execution, :astrological_sign, wait_any: true)
{:ok, "Taurus"}
iex> Journey.get_value(execution, :horoscope, wait_any: 2_000)
{:error, :not_set}
iex> execution = Journey.set_value(execution, :first_name, "Mario")
iex> Journey.get_value(execution, :horoscope, wait_any: true)
{:ok, "πͺs await, Taurus Mario!"}
Returns the chronological history of all successful computations and set values for an execution.
This function provides visibility into the order of operations during execution, showing both value sets and successful computations in chronological order. Only successful computations are included; failed computations are filtered out. At the same revision, computations appear before values.
Quick Example
history = Journey.history(execution)
# [%{node_name: :x, computation_or_value: :value, revision: 1},
# %{node_name: :sum, computation_or_value: :computation, revision: 2}, ...]
Use values/2
to see only current values, or set_value/3
and get_value/3
for individual operations.
Parameters
execution
- A%Journey.Persistence.Schema.Execution{}
struct or execution ID string
Returns
- List of maps sorted by revision, where each map contains:
:computation_or_value
- either:computation
or:value
:node_name
- the name of the node:node_type
- the type of the node (:input
,:compute
,:mutate
, etc.):revision
- the execution revision when this operation completed:value
- the actual value (only present for:value
entries)
Examples
Basic usage showing value sets and computation:
iex> import Journey.Node
iex> graph = Journey.new_graph("history example", "v1.0.0", [
...> input(:x),
...> input(:y),
...> compute(:sum, [:x, :y], fn %{x: x, y: y} -> {:ok, x + y} end)
...> ])
iex> execution = Journey.start_execution(graph)
iex> execution = Journey.set_value(execution, :x, 10)
iex> execution = Journey.set_value(execution, :y, 20)
iex> Journey.get_value(execution, :sum, wait_any: true)
{:ok, 30}
iex> Journey.history(execution) |> Enum.map(fn entry ->
...> case entry.node_name do
...> :execution_id -> %{entry | value: "..."}
...> :last_updated_at -> %{entry | value: 1234567890}
...> _ -> entry
...> end
...> end)
[
%{node_name: :execution_id, node_type: :input, computation_or_value: :value, value: "...", revision: 0},
%{node_name: :x, node_type: :input, computation_or_value: :value, value: 10, revision: 1},
%{node_name: :y, node_type: :input, computation_or_value: :value, value: 20, revision: 2},
%{node_name: :sum, node_type: :compute, computation_or_value: :computation, revision: 4},
%{node_name: :last_updated_at, node_type: :input, computation_or_value: :value, value: 1234567890, revision: 4},
%{node_name: :sum, node_type: :compute, computation_or_value: :value, value: 30, revision: 4}
]
Queries and retrieves multiple executions from the database with flexible filtering, sorting, and pagination.
This function enables searching across all executions in your system, with powerful filtering capabilities based on graph names, node values, and execution metadata. It's essential for monitoring workflows, building dashboards, and analyzing execution patterns.
Quick Example
# List all executions for a specific graph
executions = Journey.list_executions(graph_name: "user_onboarding")
# List executions for a specific graph version
v1_executions = Journey.list_executions(
graph_name: "user_onboarding",
graph_version: "v1.0.0"
)
# Find executions where age > 18
adults = Journey.list_executions(
graph_name: "user_registration",
filter_by: [{:age, :gt, 18}]
)
Use with start_execution/1
to create executions and load/2
to get individual execution details.
Parameters
options
- Keyword list of query options (all optional)::graph_name
- String name of a specific graph to filter by:graph_version
- String version of a specific graph to filter by (requires :graph_name):sort_by
- List of fields to sort by, including both execution fields and node values (see Sorting section for details):filter_by
- List of node value filters using database-level filtering for optimal performance. Each filter is a tuple{node_name, operator, value}
or{node_name, operator}
for nil checks. Operators::eq
,:neq
,:lt
,:lte
,:gt
,:gte
(comparisons),:in
,:not_in
(membership),:is_nil
,:is_not_nil
(existence). Values can be strings, numbers, booleans, nil or lists (used with:in
and:not_in
). Complex values (maps, tuples, functions) will raise an ArgumentError.:limit
- Maximum number of results (default: 10,000):offset
- Number of results to skip for pagination (default: 0):include_archived
- Whether to include archived executions (default: false)
Returns
- List of
%Journey.Persistence.Schema.Execution{}
structs with preloaded values and computations - Empty list
[]
if no executions match the criteria
Options
:sort_by
Sort by execution fields or node values. Supports atoms for ascending ([:updated_at]
),
keywords for direction ([updated_at: :desc]
), and mixed formats ([:graph_name, inserted_at: :desc]
).
Available fields:
- Execution fields:
:inserted_at
,:updated_at
,:revision
,:graph_name
,:graph_version
- Node values: Any node name from the graph (e.g.,
:age
,:score
) using JSONB ordering - Direction:
:asc
(default) or:desc
Key Behaviors
- Filtering performed at database level for optimal performance
- Only primitive values supported for filtering (complex types raise errors)
- Archived executions excluded by default
Examples
Basic listing by graph name:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "list example basic - #{Journey.Helpers.Random.random_string()}",
...> "v1.0.0",
...> [input(:status)]
...> )
iex> Journey.start_execution(graph) |> Journey.set_value(:status, "active")
iex> Journey.start_execution(graph) |> Journey.set_value(:status, "pending")
iex> executions = Journey.list_executions(graph_name: graph.name)
iex> length(executions)
2
Filtering by graph version:
iex> import Journey.Node
iex> graph_name = "version example D9Xmexzd7DVe"
iex> graph_v1 = Journey.new_graph(
...> graph_name,
...> "v1.0.0",
...> [input(:data)]
...> )
iex> graph_v2 = Journey.new_graph(
...> graph_name,
...> "v2.0.0",
...> [input(:data), input(:new_field)]
...> )
iex> Journey.start_execution(graph_v1) |> Journey.set_value(:data, "v1 data")
iex> Journey.start_execution(graph_v2) |> Journey.set_value(:data, "v2 data")
iex> Journey.list_executions(graph_name: graph_v1.name, graph_version: "v1.0.0") |> length()
1
iex> Journey.list_executions(graph_name: graph_v1.name, graph_version: "v2.0.0") |> length()
1
iex> Journey.list_executions(graph_name: graph_v1.name) |> length()
2
Validation that graph_version requires graph_name:
iex> Journey.list_executions(graph_version: "v1.0.0")
** (ArgumentError) Option :graph_version requires :graph_name to be specified
Sorting by execution fields and node values:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "sort example - #{Journey.Helpers.Random.random_string()}",
...> "v1.0.0",
...> [input(:priority)]
...> )
iex> Journey.start_execution(graph) |> Journey.set_value(:priority, "high")
iex> Journey.start_execution(graph) |> Journey.set_value(:priority, "low")
iex> Journey.start_execution(graph) |> Journey.set_value(:priority, "medium")
iex> # Sort by priority descending - shows the actual sorted values
iex> Journey.list_executions(graph_name: graph.name, sort_by: [priority: :desc]) |> Enum.map(fn e -> Journey.values(e) |> Map.get(:priority) end)
["medium", "low", "high"]
Filtering with multiple operators:
iex> graph = Journey.Examples.Horoscope.graph()
iex> for day <- 1..20, do: Journey.start_execution(graph) |> Journey.set_value(:birth_day, day) |> Journey.set_value(:birth_month, 4) |> Journey.set_value(:first_name, "Mario")
iex> # Various filtering examples
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:birth_day, :eq, 10}]) |> Enum.count()
1
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:birth_day, :neq, 10}]) |> Enum.count()
19
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:birth_day, :lte, 5}]) |> Enum.count()
5
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:birth_day, :in, [5, 10, 15]}]) |> Enum.count()
3
iex> Journey.list_executions(graph_name: graph.name, filter_by: [{:first_name, :is_not_nil}]) |> Enum.count()
20
Multiple filters, sorting, and pagination:
iex> graph = Journey.Examples.Horoscope.graph()
iex> for day <- 1..20, do: Journey.start_execution(graph) |> Journey.set_value(:birth_day, day) |> Journey.set_value(:birth_month, 4) |> Journey.set_value(:first_name, "Mario")
iex> # Multiple filters combined
iex> Journey.list_executions(
...> graph_name: graph.name,
...> filter_by: [{:birth_day, :gt, 10}, {:first_name, :is_not_nil}],
...> sort_by: [birth_day: :desc],
...> limit: 5
...> ) |> Enum.count()
5
iex> # Pagination
iex> Journey.list_executions(graph_name: graph.name, limit: 3) |> Enum.count()
3
iex> Journey.list_executions(graph_name: graph.name, limit: 5, offset: 10) |> Enum.count()
5
Including archived executions:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "list example - archived - #{Journey.Helpers.Random.random_string()}",
...> "v1.0.0",
...> [input(:status)]
...> )
iex> e1 = Journey.start_execution(graph)
iex> _e2 = Journey.start_execution(graph)
iex> Journey.archive(e1)
iex> Journey.list_executions(graph_name: graph.name) |> length()
1
iex> Journey.list_executions(graph_name: graph.name, include_archived: true) |> length()
2
Reloads the current state of an execution from the database to get the latest changes.
Executions can be modified by their background computations, or scheduled events, or other processes setting their values. This function is used to get the latest state of an execution -- as part of normal operations, or when the system starts up, or when the user whose session is being tracked as an execution comes back to the web site and resumes their flow.
Quick Example
execution = Journey.set_value(execution, :name, "Mario")
execution = Journey.load(execution) # Get updated state with new revision
{:ok, greeting} = Journey.get_value(execution, :greeting, wait_any: true)
Use set_value/3
and get_value/3
to modify and read execution values.
Parameters
execution
- A%Journey.Persistence.Schema.Execution{}
struct or execution ID stringopts
- Keyword list of options (see Options section below)
Returns
- A
%Journey.Persistence.Schema.Execution{}
struct with current database state, ornil
if not found
Options
:preload
- Whether to preload associated nodes and values. Defaults totrue
. Set tofalse
for better performance when you only need execution metadata.:include_archived
- Whether to include archived executions. Defaults tofalse
. Archived executions are normally hidden but can be loaded with this option.
Key Behaviors
- Fresh state - Always returns the current state from the database, not cached data
- Revision tracking - Loaded execution will have the latest revision number
- Archived handling - Archived executions return
nil
unless explicitly included - Performance option - Use
preload: false
to skip loading values/computations for speed
Examples
Basic reloading after value changes:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "load example - basic",
...> "v1.0.0",
...> [
...> input(:name),
...> compute(:greeting, [:name], fn %{name: name} -> {:ok, "Hello, #{name}!"} end)
...> ]
...> )
iex> execution = Journey.start_execution(graph)
iex> execution.revision
0
iex> execution = Journey.set_value(execution, :name, "Alice")
iex> execution.revision > 0
true
iex> {:ok, "Hello, Alice!"} = Journey.get_value(execution, :greeting, wait_any: true)
iex> reloaded = Journey.load(execution)
iex> reloaded.revision >= execution.revision
true
Loading by execution ID:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "load example - by id",
...> "v1.0.0",
...> [input(:data)]
...> )
iex> execution = Journey.start_execution(graph)
iex> execution_id = execution.id
iex> reloaded = Journey.load(execution_id)
iex> reloaded.id == execution_id
true
Performance optimization with preload option:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "load example - no preload",
...> "v1.0.0",
...> [input(:data)]
...> )
iex> execution = Journey.start_execution(graph)
iex> fast_load = Journey.load(execution, preload: false)
iex> fast_load.id == execution.id
true
Handling archived executions:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "load example - archived",
...> "v1.0.0",
...> [input(:data)]
...> )
iex> execution = Journey.start_execution(graph)
iex> Journey.archive(execution)
iex> Journey.load(execution)
nil
iex> Journey.load(execution, include_archived: true) != nil
true
Creates a new computation graph with the given name, version, and node definitions.
This is the foundational function for defining Journey graphs. It creates a validated
graph structure that can be used to start executions with start_execution/1
. The graph
defines the data flow, dependencies, and computations for your application workflow.
Quick Example
import Journey.Node
graph = Journey.new_graph(
"user onboarding",
"v1.0.0",
[
input(:email),
compute(:welcome_message, [:email], fn %{email: email} ->
{:ok, "Welcome #{email}!"}
end)
]
)
execution = Journey.start_execution(graph)
Use start_execution/1
to create executions and set_value/3
to populate input values.
Parameters
name
- String identifying the graph (e.g., "user registration workflow")version
- String version identifier following semantic versioning (e.g., "v1.0.0")nodes
- List of node definitions created withJourney.Node
functions (input/1
,compute/4
, etc.)opts
- Optional keyword list of options::f_on_save
- Graph-wide callback function invoked after any node computation succeeds. Receives(execution_id, node_name, result)
where result is{:ok, value}
or{:error, reason}
. This callback is called after any node-specificf_on_save
callbacks.
Returns
%Journey.Graph{}
struct representing the validated and registered computation graph
Errors
- Raises
RuntimeError
if graph validation fails (e.g., circular dependencies, unknown node references) - Raises
ArgumentError
if parameters have invalid types or empty node list - Raises
KeywordValidator.Error
if options are invalid
Key Behaviors
- Validation - Automatically validates graph structure for cycles, dependency correctness
- Registration - Registers graph in catalog for execution tracking and reloading
- Immutable - Graph definition is immutable once created; create new versions for changes
- Node types - Supports input, compute, mutate, schedule_once, and schedule_recurring nodes
f_on_save
Callbacks - If defined, the graph-widef_on_save
callback is called after Node-specificf_on_save
s (if defined)
Examples
Basic workflow with input and computation:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "greeting workflow",
...> "v1.0.0",
...> [
...> input(:name),
...> compute(:greeting, [:name], fn %{name: name} -> {:ok, "Hello, #{name}!"} end)
...> ]
...> )
iex> graph.name
"greeting workflow"
iex> execution = Journey.start_execution(graph)
iex> execution = Journey.set_value(execution, :name, "Alice")
iex> Journey.get_value(execution, :greeting, wait_any: true)
{:ok, "Hello, Alice!"}
Graph with a graph-wide f_on_save
callback:
iex> import Journey.Node
iex> _graph = Journey.new_graph(
...> "notification workflow",
...> "v1.0.0",
...> [
...> input(:user_id),
...> compute(:fetch_user, [:user_id], fn %{user_id: id} ->
...> {:ok, %{id: id, name: "User #{id}"}}
...> end),
...> compute(:send_email, [:fetch_user], fn %{fetch_user: user} ->
...> {:ok, "Email sent to #{user.name}"}
...> end)
...> ],
...> f_on_save: fn _execution_id, node_name, result ->
...> # This will be called for both :fetch_user and :send_email computations
...> IO.puts("Node #{node_name} completed with result: #{inspect(result)}")
...> :ok
...> end
...> )
Complex workflow with conditional dependencies:
iex> import Journey.Node
iex> import Journey.Node.Conditions
iex> import Journey.Node.UpstreamDependencies
iex> graph = Journey.new_graph(
...> "horoscope workflow",
...> "v1.0.0",
...> [
...> input(:first_name),
...> input(:birth_day),
...> input(:birth_month),
...> compute(
...> :zodiac_sign,
...> [:birth_month, :birth_day],
...> fn %{birth_month: _birth_month, birth_day: _birth_day} ->
...> {:ok, "Taurus"}
...> end
...> ),
...> compute(
...> :horoscope,
...> unblocked_when({
...> :and,
...> [
...> {:first_name, &provided?/1},
...> {:zodiac_sign, &provided?/1}
...> ]
...> }),
...> fn %{first_name: name, zodiac_sign: zodiac_sign} ->
...> {:ok, "πͺs await, #{zodiac_sign} #{name}!"}
...> end
...> )
...> ]
...> )
iex> execution = Journey.start_execution(graph)
iex> execution = Journey.set_value(execution, :birth_day, 15)
iex> execution = Journey.set_value(execution, :birth_month, "May")
iex> Journey.get_value(execution, :zodiac_sign, wait_any: true)
{:ok, "Taurus"}
iex> execution = Journey.set_value(execution, :first_name, "Bob")
iex> Journey.get_value(execution, :horoscope, wait_any: true)
{:ok, "πͺs await, Taurus Bob!"}
Multiple node types in a workflow:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "data processing workflow",
...> "v2.1.0",
...> [
...> input(:raw_data),
...> compute(:upper_case, [:raw_data], fn %{raw_data: data} ->
...> {:ok, String.upcase(data)}
...> end),
...> compute(:suffix, [:upper_case], fn %{upper_case: data} ->
...> {:ok, "#{data} omg yay"}
...> end)
...> ]
...> )
iex> execution = Journey.start_execution(graph)
iex> execution = Journey.set_value(execution, :raw_data, "hello world")
iex> Journey.get_value(execution, :upper_case, wait_any: true)
{:ok, "HELLO WORLD"}
iex> Journey.get_value(execution, :suffix, wait_any: true)
{:ok, "HELLO WORLD omg yay"}
Sets the value for an input node in an execution and triggers recomputation of dependent nodes.
When a value is set, Journey automatically recomputes any dependent computed nodes to ensure consistency across the dependency graph. The operation is idempotent - setting the same value twice has no effect.
Parameters
execution
- A%Journey.Persistence.Schema.Execution{}
struct or execution ID stringnode_name
- Atom representing the input node name (must exist in the graph)value
- The value to set. Supported types: nil, string, number, map, list, boolean. Note that if the map or the list contains atoms, those atoms will be converted to strings.
Returns
- Updated
%Journey.Persistence.Schema.Execution{}
struct with incremented revision (if value changed)
Errors
- Raises
RuntimeError
if the node name does not exist in the execution's graph - Raises
RuntimeError
if attempting to set a compute node (only input nodes can be set)
Key Behaviors
- Automatic recomputation - Setting a value triggers recomputation of all dependent nodes
- Idempotent - Setting the same value twice has no effect (no revision increment)
- Input nodes only - Only input nodes can be set; compute nodes are read-only
Quick Example
execution = Journey.set_value(execution, :name, "Mario")
{:ok, greeting} = Journey.get_value(execution, :greeting, wait_any: true)
Use get_value/3
to retrieve the set value and unset_value/2
to remove values.
Examples
Basic setting with cascading recomputation:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "set workflow - cascading example",
...> "v1.0.0",
...> [
...> input(:name),
...> compute(:greeting, [:name], fn %{name: name} -> {:ok, "Hello, #{name}!"} end)
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set_value(execution, :name, "Mario")
iex> Journey.get_value(execution, :greeting, wait_any: true)
{:ok, "Hello, Mario!"}
iex> execution = Journey.set_value(execution, :name, "Luigi")
iex> Journey.get_value(execution, :greeting, wait_new: true)
{:ok, "Hello, Luigi!"}
Idempotent behavior - same value doesn't change revision:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "set workflow - idempotent example",
...> "v1.0.0",
...> [input(:name)]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set_value(execution, :name, "Mario")
iex> first_revision = execution.revision
iex> execution = Journey.set_value(execution, :name, "Mario")
iex> execution.revision == first_revision
true
Different value types:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "set workflow - value types example",
...> "v1.0.0",
...> [input(:number), input(:flag), input(:data)]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set_value(execution, :number, 42)
iex> execution = Journey.set_value(execution, :flag, true)
iex> execution = Journey.set_value(execution, :data, %{key: "value"})
iex> Journey.get_value(execution, :number)
{:ok, 42}
iex> Journey.get_value(execution, :flag)
{:ok, true}
Using an execution ID:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "set workflow - execution_id example",
...> "v1.0.0",
...> [input(:name)]
...> )
iex> execution = graph |> Journey.start_execution()
iex> updated_execution = Journey.set_value(execution.id, :name, "Luigi")
iex> Journey.get_value(updated_execution, :name)
{:ok, "Luigi"}
Starts a new execution instance of a computation graph, initializing it to accept input values and perform computations.
Creates a persistent execution in the database with a unique ID and begins background processing for any schedulable nodes. The execution starts with revision 0 and no values set.
Quick Example
execution = Journey.start_execution(graph)
execution = Journey.set_value(execution, :name, "Mario")
{:ok, greeting} = Journey.get_value(execution, :greeting, wait_any: true)
Use set_value/3
to provide input values and get_value/3
to retrieve computed results.
Parameters
graph
- A validated%Journey.Graph{}
struct created withnew_graph/3
. The graph must have passed validation during creation and be registered in the graph catalog.
Returns
- A new
%Journey.Persistence.Schema.Execution{}
struct with::id
- Unique execution identifier (UUID string):graph_name
and:graph_version
- From the source graph:revision
- Always starts at 0, increments with each state change:archived_at
- Initially nil (not archived) and other fields.
Key Behaviors
- Database persistence - Execution state is immediately saved to PostgreSQL
- Unique execution - Each call creates a completely independent execution instance
- Background processing - Scheduler automatically begins monitoring for schedulable nodes
- Ready for inputs - Can immediately accept input values via
set_value/3
Examples
Basic execution creation:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "greeting workflow",
...> "v1.0.0",
...> [
...> input(:name),
...> compute(
...> :greeting,
...> [:name],
...> fn %{name: name} -> {:ok, "Hello, #{name}!"} end
...> )
...> ]
...> )
iex> execution = Journey.start_execution(graph)
iex> execution.graph_name
"greeting workflow"
iex> execution.graph_version
"v1.0.0"
iex> execution.revision
0
Execution properties and immediate workflow:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "calculation workflow",
...> "v1.0.0",
...> [
...> input(:x),
...> input(:y),
...> compute(:sum, [:x, :y], fn %{x: x, y: y} -> {:ok, x + y} end)
...> ]
...> )
iex> execution = Journey.start_execution(graph)
iex> is_binary(execution.id)
true
iex> execution.archived_at
nil
iex> user_values = Journey.values(execution, reload: false) |> Map.drop([:execution_id, :last_updated_at])
iex> user_values
%{}
iex> execution = Journey.set_value(execution, :x, 10)
iex> execution = Journey.set_value(execution, :y, 20)
iex> Journey.get_value(execution, :sum, wait_any: true)
{:ok, 30}
Multiple independent executions:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "counter workflow",
...> "v1.0.0",
...> [input(:count)]
...> )
iex> execution1 = Journey.start_execution(graph)
iex> execution2 = Journey.start_execution(graph)
iex> execution1.id != execution2.id
true
iex> execution1 = Journey.set_value(execution1, :count, 1)
iex> execution2 = Journey.set_value(execution2, :count, 2)
iex> Journey.get_value(execution1, :count)
{:ok, 1}
iex> Journey.get_value(execution2, :count)
{:ok, 2}
Un-archives the supplied execution, if it is archived.
Parameters:
execution
orexecution_id
: The execution to un-archive, or the ID of the execution to un-archive.
Returns
- :ok
Examples
iex> execution =
...> Journey.Examples.Horoscope.graph() |>
...> Journey.start_execution() |>
...> Journey.set_value(:birth_day, 26)
iex> _archived_at = Journey.archive(execution)
iex> # The execution is now archived, and it is no longer visible.
iex> nil == Journey.load(execution, include_archived: false)
true
iex> Journey.unarchive(execution)
:ok
iex> # The execution is now un-archived, and it can now be loaded.
iex> nil == Journey.load(execution, include_archived: false)
false
iex> # Un-archiving an un-archived execution has no effect.
iex> Journey.unarchive(execution)
:ok
Removes the value from an input node in an execution and invalidates all dependent computed nodes.
When a value is unset, Journey automatically invalidates (unsets) all computed nodes that depend on the unset input, creating a cascading effect through the dependency graph. This ensures data consistency - no computed values remain that were based on the now-removed input.
Quick Example
execution = Journey.unset_value(execution, :name)
{:error, :not_set} = Journey.get_value(execution, :name)
Use set_value/3
to set values and get_value/3
to check if values are set.
Parameters
execution
- A%Journey.Persistence.Schema.Execution{}
struct or execution ID stringnode_name
- Atom representing the input node name (must exist in the graph)
Returns
- Updated
%Journey.Persistence.Schema.Execution{}
struct with incremented revision (if value was set)
Errors
- Raises
RuntimeError
if the node name does not exist in the execution's graph - Raises
RuntimeError
if attempting to unset a compute node (only input nodes can be unset)
Key Behaviors
- Cascading invalidation - Dependent computed nodes are automatically unset
- Idempotent - Multiple unsets of the same value have no additional effect
- Input nodes only - Only input nodes can be unset; compute nodes cannot be unset
Examples
Basic unsetting with cascading invalidation:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "unset workflow - basic example",
...> "v1.0.0",
...> [
...> input(:name),
...> compute(
...> :greeting,
...> [:name],
...> fn %{name: name} -> {:ok, "Hello, #{name}!"} end
...> )
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set_value(execution, :name, "Mario")
iex> Journey.get_value(execution, :greeting, wait_any: true)
{:ok, "Hello, Mario!"}
iex> execution_after_unset = Journey.unset_value(execution, :name)
iex> Journey.get_value(execution_after_unset, :name)
{:error, :not_set}
iex> Journey.get_value(execution_after_unset, :greeting)
{:error, :not_set}
Multi-level cascading (A β B β C chain):
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "unset workflow - cascade example",
...> "v1.0.0",
...> [
...> input(:a),
...> compute(:b, [:a], fn %{a: a} -> {:ok, "B:#{a}"} end),
...> compute(:c, [:b], fn %{b: b} -> {:ok, "C:#{b}"} end)
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set_value(execution, :a, "value")
iex> Journey.get_value(execution, :b, wait_any: true)
{:ok, "B:value"}
iex> Journey.get_value(execution, :c, wait_any: true)
{:ok, "C:B:value"}
iex> execution_after_unset = Journey.unset_value(execution, :a)
iex> Journey.get_value(execution_after_unset, :a)
{:error, :not_set}
iex> Journey.get_value(execution_after_unset, :b)
{:error, :not_set}
iex> Journey.get_value(execution_after_unset, :c)
{:error, :not_set}
Idempotent behavior:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "unset workflow - idempotent example",
...> "v1.0.0",
...> [input(:name)]
...> )
iex> execution = graph |> Journey.start_execution()
iex> original_revision = execution.revision
iex> execution_after_unset = Journey.unset_value(execution, :name)
iex> execution_after_unset.revision == original_revision
true
Returns a map of all set node values in an execution, excluding unset nodes.
This function filters the execution to only include nodes that have been populated with data.
Unset nodes are excluded from the result. Always includes :execution_id
and :last_updated_at
metadata.
Quick Example
execution = Journey.set_value(execution, :name, "Alice")
values = Journey.values(execution)
# %{name: "Alice", execution_id: "EXEC...", last_updated_at: 1234567890}
Use values_all/1
to see all nodes including unset ones, or get_value/3
for individual values.
Parameters
execution
- A%Journey.Persistence.Schema.Execution{}
structopts
- Keyword list of options (:reload
- seevalues_all/1
for details)
Returns
- Map with node names as keys and their current values as values
- Only includes nodes that have been set (excludes
:not_set
nodes)
Examples
Basic usage:
iex> import Journey.Node
iex> graph = Journey.new_graph("example", "v1.0.0", [input(:name), input(:age)])
iex> execution = Journey.start_execution(graph)
iex> Journey.values(execution) |> redact([:execution_id, :last_updated_at])
%{execution_id: "...", last_updated_at: 1234567890}
iex> execution = Journey.set_value(execution, :name, "Alice")
iex> Journey.values(execution) |> redact([:execution_id, :last_updated_at])
%{name: "Alice", execution_id: "...", last_updated_at: 1234567890}
Returns a map of all nodes in an execution with their current status, including unset nodes.
Unlike values/2
which only returns set nodes, this function shows all nodes including those
that haven't been set yet. Unset nodes are marked as :not_set
, while set nodes are returned
as {:set, value}
tuples. Useful for debugging and introspection.
Quick Example
all_status = Journey.values_all(execution)
# %{name: {:set, "Alice"}, age: :not_set, execution_id: {:set, "EXEC..."}, ...}
Use values/2
to get only set values, or get_value/3
for individual node values.
Parameters
execution
- A%Journey.Persistence.Schema.Execution{}
structopts
- Keyword list of options (:reload
- defaults totrue
for fresh database state)
Returns
- Map with all nodes showing status:
:not_set
or{:set, value}
- Includes all nodes defined in the graph, regardless of current state
Examples
Basic usage showing status progression:
iex> import Journey.Node
iex> graph = Journey.new_graph("example", "v1.0.0", [input(:name), input(:age)])
iex> execution = Journey.start_execution(graph)
iex> Journey.values_all(execution) |> redact([:execution_id, :last_updated_at])
%{name: :not_set, age: :not_set, execution_id: {:set, "..."}, last_updated_at: {:set, 1234567890}}
iex> execution = Journey.set_value(execution, :name, "Alice")
iex> Journey.values_all(execution) |> redact([:execution_id, :last_updated_at])
%{name: {:set, "Alice"}, age: :not_set, execution_id: {:set, "..."}, last_updated_at: {:set, 1234567890}}