Journey.Node (Journey v0.10.36)
This module contains functions for creating nodes in a graph. Nodes in a graph can be of several types:
input/1– a node that takes input from the user.compute/4– a node that computes a value based on its upstream nodes.mutate/4– a node that mutates the value of another node.historian/3– a node that tracks the history of changes to another node.schedule_once/3– a node that, once unblocked, in its turn, unblocks others, on a schedule.schedule_recurring/3– a node that, once unblocked, in its turn, unblocks others, on a schedule, time after time.
Summary
Functions
Creates a graph node that mutates the value of another node.
Creates a self-computing node.
EXPERIMENTAL: Creates a history-tracking node that maintains a chronological log of changes to one or more nodes.
Creates a graph input node. The value of an input node is set with Journey.set/3. The name of the node must be an atom.
Creates a graph node that mutates the value of another node.
Creates a graph node that declares its readiness at a specific time, once.
Creates a graph node that declares its readiness at a specific time, time after time.
Functions
Creates a graph node that mutates the value of another node.
Examples:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "`archive()` doctest graph (a useless machine that archives itself immediately;)",
...> "v1.0.0",
...> [
...> input(:name),
...> archive(:archive, [:name])
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution.archived_at == nil
true
iex> execution = Journey.set(execution, :name, "Mario")
iex> {:ok, _, _} = Journey.get(execution, :archive, wait: :any)
iex> Journey.load(execution)
nil
iex> execution = Journey.load(execution, include_archived: true)
iex> execution.archived_at == nil
false
Creates a self-computing node.
name is an atom uniquely identifying the node in this graph.
gated_by defines when this node becomes eligible to compute.
Accepts either:
- A list of atom node names, e.g.
[:a, :b], indicating the node becomes unblocked when all of the listed nodes have a value. - A keyword list with conditions, e.g.
[a: fn node -> node.node_value > 10 end], for conditional dependencies. - A mixed list combining atoms and keyword conditions, e.g.
[:a, :b, c: fn node -> node.node_value > 5 end]. - A structured condition (see unblocked_when/1 )
allowing for logical operators (
:and,:or) and custom value predicates (e.g.unblocked_when({:and, [{:a, &provided?/1}, {:b, &provided?/1}]})).
f_compute is the function that computes the value of the node, once the upstream dependencies are satisfied.
The function can accept either one or two arguments:
- Arity 1:
fn values_map -> ... end- Receives a map of upstream node names to their values - Arity 2:
fn values_map, value_nodes_map -> ... end- Additionally receives value node data from upstream nodes
The function must return a tuple:
{:ok, value}or{:error, reason}.
The value_nodes_map (when using arity-2) contains detailed information for each upstream dependency, keyed by node name.
Each entry is a map with the following fields:
:node_value- The current value of the node:metadata- Metadata set viaJourney.set/3:revision- The revision number when this value was set:set_time- Unix timestamp when the value was set
This is useful for accessing contextual information like author IDs, timestamps, revision tracking, or data provenance. The function is called when the upstream nodes are set, and the value is set to the result of the function.
Note that return values are JSON-serialized for storage. If the returned value or reason contains atoms
(e.g., {:ok, :pending} or {:ok, %{status: :active}}), those atoms will be converted to
strings when retrieved via get_value/3.
In the case of a failure, the function is automatically retried, up to max_retries times.
If the function fails after max_retries attempts, the node is marked as failed.
If the function does not return within abandon_after_seconds, it is considered abandoned, and it will be retried (up to max_retries times).
Examples:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "`compute()` doctest graph (pig-latinize-ish a name)",
...> "v1.0.0",
...> [
...> input(:name),
...> compute(
...> :pig_latin_ish_name,
...> [:name],
...> fn %{name: name} ->
...> {:ok, "#{name}-ay"}
...> end,
...> max_retries: 4, # Optional (default: 3)
...> abandon_after_seconds: 60, # Optional (default: 60)
...> f_on_save: fn _execution_id, _params ->
...> # Optional callback to be called when the value is saved.
...> # This is useful for notifying other systems (e.g. a LiveView
...> # via PubSub.notify()) – that the value has been saved.
...> :ok
...> end
...> )
...> ]
...> )
iex> execution = graph |> Journey.start_execution() |> Journey.set(:name, "Alice")
iex> {:ok, "Alice-ay", 3} = execution |> Journey.get(:pig_latin_ish_name, wait: :any)
iex> execution |> Journey.values() |> redact([:execution_id, :last_updated_at])
%{name: "Alice", pig_latin_ish_name: "Alice-ay", execution_id: "...", last_updated_at: 1_234_567_890}Keyword List Syntax for Conditional Dependencies
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "threshold alert example",
...> "v1.0.0",
...> [
...> input(:temperature),
...> # Using keyword list syntax for conditional dependency
...> compute(
...> :high_temp_alert,
...> [temperature: fn node -> node.node_value > 30 end],
...> fn %{temperature: temp} ->
...> {:ok, "High temperature alert: #{temp}°C"}
...> end
...> )
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, :temperature, 25)
iex> Journey.get(execution, :high_temp_alert)
{:error, :not_set}
iex> execution = Journey.set(execution, :temperature, 35)
iex> {:ok, "High temperature alert: 35°C", 4} = Journey.get(execution, :high_temp_alert, wait: :any)Using Value Node Data in Compute Functions
Value node data can be accessed by defining an arity-2 compute function. This is useful for accessing contextual information like author IDs, timestamps, revisions, or data provenance from upstream nodes.
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "compute with value node data example",
...> "v1.0.0",
...> [
...> input(:title),
...> compute(
...> :title_with_author,
...> [:title],
...> # Arity-2 function receives value node data from dependencies
...> fn %{title: title}, value_nodes_map ->
...> author = get_in(value_nodes_map, [:title, :metadata, "author_id"]) || "unknown"
...> {:ok, "#{title} by #{author}"}
...> end
...> )
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, :title, "Hello", metadata: %{"author_id" => "user123"})
iex> {:ok, result, _} = Journey.get(execution, :title_with_author, wait: :any)
iex> result
"Hello by user123"Return Values
The f_compute function must return {:ok, value} or {:error, reason}. Note that atoms
in the returned value and reason will be converted to strings when persisted.
EXPERIMENTAL: Creates a history-tracking node that maintains a chronological log of changes to one or more nodes.
name is an atom uniquely identifying this history node.
gated_by defines which nodes to track. Accepts the same formats as compute/4:
- A single-item list like
[:node_name]to track one node - A list like
[:a, :b]to track multiple nodes (all must be set) - Complex conditions using
unblocked_when/1(e.g.,unblocked_when({:or, [{:a, &provided?/1}, {:b, &provided?/1}]}))
The historian will track changes to ALL nodes in the dependency tree and record only those that have changed since the last recording.
Options
:max_entries(optional) - Maximum number of history entries to keep (FIFO). Defaults to 1000. Set tonilfor unlimited history.
History Format
The history is returned as a list of entries in newest-first order (most recent changes at index 0). Each entry is a map containing:
"value"- The value of the changed node"node"- The name of the node (as string)"timestamp"- Unix timestamp when recorded"metadata"- Metadata from the node (if any)"revision"- Revision number of the node when recorded
Examples:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "`historian()` doctest graph (tracks content changes)",
...> "v1.0.0",
...> [
...> input(:content),
...> historian(:content_history, [:content])
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, :content, "First version")
iex> {:ok, history1, _} = Journey.get(execution, :content_history, wait: :any)
iex> length(history1)
1
iex> [%{"value" => "First version", "node" => "content", "timestamp" => _ts}] = history1
iex> execution = Journey.set(execution, :content, "Second version")
iex> {:ok, history2, _} = Journey.get(execution, :content_history, wait: :newer)
iex> length(history2)
2
iex> [%{"value" => "Second version", "node" => "content", "timestamp" => _ts}, _] = history2With custom max_entries limit:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "historian with max_entries",
...> "v1.0.0",
...> [
...> input(:status),
...> historian(:status_history, [:status], max_entries: 2)
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, :status, "pending")
iex> {:ok, history, rev1} = Journey.get(execution, :status_history, wait: :any)
iex> Enum.map(history, fn entry -> entry["value"] end)
["pending"]
iex> execution = Journey.set(execution, :status, "active")
iex> {:ok, history, rev2} = Journey.get(execution, :status_history, wait: {:newer_than, rev1})
iex> Enum.map(history, fn entry -> entry["value"] end)
["active", "pending"]
iex> execution = Journey.set(execution, :status, "completed")
iex> {:ok, history, _rev} = Journey.get(execution, :status_history, wait: {:newer_than, rev2})
iex> # Since status_history is limited to `max_entries: 2`, we'll only see the 2 latest values (newest first).
iex> Enum.map(history, fn entry -> entry["value"] end)
["completed", "active"]
With unlimited history:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "historian with unlimited history",
...> "v1.0.0",
...> [
...> input(:audit_event),
...> # Explicitly opt-in to unlimited history for audit trail
...> historian(:audit_log, [:audit_event], max_entries: nil)
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, :audit_event, "login")
iex> {:ok, history, _} = Journey.get(execution, :audit_log, wait: :any)
iex> length(history)
1
iex> [%{"value" => "login", "node" => "audit_event", "timestamp" => _ts}] = historyTracking multiple nodes with :or condition:
iex> import Journey.Node
iex> import Journey.Node.Conditions
iex> import Journey.Node.UpstreamDependencies
iex> graph = Journey.new_graph(
...> "historian multi-node example",
...> "v1.0.0",
...> [
...> input(:email),
...> input(:phone),
...> # Track changes to either email or phone
...> historian(
...> :contact_history,
...> unblocked_when({
...> :or,
...> [{:email, &provided?/1}, {:phone, &provided?/1}]
...> })
...> )
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> execution = Journey.set(execution, :email, "user@example.com")
iex> {:ok, history1, _} = Journey.get(execution, :contact_history, wait: :any)
iex> length(history1)
1
iex> [%{"value" => "user@example.com", "node" => "email"}] = history1
iex> execution = Journey.set(execution, :phone, "555-1234")
iex> {:ok, history2, _} = Journey.get(execution, :contact_history, wait: :newer)
iex> length(history2)
2
iex> # Newest first: phone, then email
iex> [%{"value" => "555-1234", "node" => "phone"}, %{"value" => "user@example.com", "node" => "email"}] = history2
Creates a graph input node. The value of an input node is set with Journey.set/3. The name of the node must be an atom.
Examples:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "`input()` doctest graph (just a few input nodes)",
...> "v1.0.0",
...> [
...> input(:first_name),
...> input(:last_name),
...> input(:zip_code)
...> ]
...> )
iex> execution = graph |> Journey.start_execution() |> Journey.set(:first_name, "Mario")
iex> Journey.values(execution) |> redact([:execution_id, :last_updated_at])
%{first_name: "Mario", execution_id: "...", last_updated_at: 1_234_567_890}
Creates a graph node that mutates the value of another node.
Examples:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "`mutate()` doctest graph (a useless machine;)",
...> "v1.0.0",
...> [
...> input(:name),
...> mutate(
...> :remove_pii,
...> [:name],
...> fn %{name: _name} ->
...> # Return the new value for the "name" node.
...> {:ok, "redacted"}
...> end,
...> mutates: :name # The name of an existing node whose value will be mutated.
...> )
...> ]
...> )
iex> execution =
...> graph
...> |> Journey.start_execution()
...> |> Journey.set(:name, "Mario")
iex> {:ok, "updated :name", 3} = execution |> Journey.get(:remove_pii, wait: :any)
iex> execution |> Journey.values() |> redact([:execution_id, :last_updated_at])
%{name: "redacted", remove_pii: "updated :name", execution_id: "...", last_updated_at: 1_234_567_890}Options
:update_revision_on_change(default:false) - Whentrue, updating the mutated node's value also increments its revision, triggering downstream nodes to recompute. Whenfalse, mutations update the value without triggering recomputation. Setting this totruefor a node that mutates an upstream dependency will raise a validation error to prevent cycles.
External Polling Pattern
A common use case is polling external data sources and triggering downstream recomputations:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "external polling example",
...> "v1.0.0",
...> [
...> schedule_recurring(:poll_schedule, [], fn _ -> {:ok, System.system_time(:second) + 2} end),
...> input(:cached_data),
...> mutate(:update_from_api, [:poll_schedule],
...> fn %{cached_data: current} -> {:ok, (current || 0) + 1} end,
...> mutates: :cached_data,
...> update_revision_on_change: true
...> ),
...> compute(:processed_data, [:cached_data],
...> fn %{cached_data: data} -> {:ok, data * 2} end
...> )
...> ]
...> )
iex> execution = graph |> Journey.start_execution() |> Journey.set(:cached_data, 5)
iex> background_sweeps_task = Journey.Scheduler.Background.Periodic.start_background_sweeps_in_test(execution.id)
iex> {:ok, "updated :cached_data", _} = Journey.get(execution, :update_from_api, wait: :any)
iex> {:ok, 12, _} = Journey.get(execution, :processed_data, wait: :any)
iex> Journey.Scheduler.Background.Periodic.stop_background_sweeps_in_test(background_sweeps_task)Return Values
The f_compute function must return {:ok, value} or {:error, reason}. Note that atoms
in the returned value and reason will be converted to strings when persisted.
Creates a graph node that declares its readiness at a specific time, once.
Once this node is unblocked, it will be executed to set the time at which it will unblock its downstream dependencies.
Examples:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "`schedule_once()` doctest graph (it reminds you to take a nap in a couple of seconds;)",
...> "v1.0.0",
...> [
...> input(:name),
...> schedule_once(
...> :schedule_a_nap,
...> [:name],
...> fn %{name: _name} ->
...> # This function is to return the time (in epoch seconds) at which
...> # its downstream dependencies should be unblocked.
...> in_two_seconds = System.system_time(:second) + 2
...> {:ok, in_two_seconds}
...> end
...> ),
...> compute(
...> :nap_time,
...> [:name, :schedule_a_nap],
...> fn %{name: name, schedule_a_nap: _time_to_take_a_nap} ->
...> {:ok, "It's time to take a nap, #{name}!"}
...> end
...> )
...> ]
...> )
iex> execution =
...> graph
...> |> Journey.start_execution()
...> |> Journey.set(:name, "Mario")
iex> execution |> Journey.values() |> Map.get(:name)
"Mario"
iex> # This is only needed in a test, to simulate the background processing that happens in non-tests automatically.
iex> background_sweeps_task = Journey.Scheduler.Background.Periodic.start_background_sweeps_in_test(execution.id)
iex> {:ok, "It's time to take a nap, Mario!", _} = execution |> Journey.get(:nap_time, wait: :any)
iex> Journey.Scheduler.Background.Periodic.stop_background_sweeps_in_test(background_sweeps_task)
Return Values
The f_compute function must return {:ok, value} or {:error, reason}. Note that atoms
in the returned value and reason will be converted to strings when persisted.
Returning {:ok, 0} indicates that the schedule should not trigger downstream computations.
This might be useful for conditional scheduling - when the function determines that
scheduling is not needed based on the current state.
Conditional Scheduling Example
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "conditional scheduling example",
...> "v1.0.0",
...> [
...> input(:user_name),
...> input(:wants_reminder),
...> schedule_once(
...> :schedule_reminder,
...> [:user_name, :wants_reminder],
...> fn %{wants_reminder: wants_reminder} ->
...> if wants_reminder do
...> # Schedule for 2 seconds from now
...> {:ok, System.system_time(:second) + 2}
...> else
...> # Don't schedule - user opted out
...> {:ok, 0}
...> end
...> end
...> ),
...> compute(
...> :send_reminder,
...> [:user_name, :schedule_reminder],
...> fn %{user_name: name} ->
...> {:ok, "Reminder for #{name}"}
...> end
...> )
...> ]
...> )
iex> execution = graph |> Journey.start_execution()
iex> background_sweeps_task = Journey.Scheduler.Background.Periodic.start_background_sweeps_in_test(execution.id)
iex> # User doesn't want reminders - schedule returns 0
iex> execution = Journey.set(execution, :user_name, "Mario")
iex> execution = Journey.set(execution, :wants_reminder, false)
iex> {:ok, 0, _} = Journey.get(execution, :schedule_reminder, wait: :any)
iex> # Reminder is never sent because schedule returned 0
iex> :timer.sleep(3_000)
iex> Journey.get(execution, :send_reminder)
{:error, :not_set}
iex> Journey.Scheduler.Background.Periodic.stop_background_sweeps_in_test(background_sweeps_task)
Creates a graph node that declares its readiness at a specific time, time after time.
Once this node is unblocked, it will be repeatedly computed, to set the time at which it will unblock its downstream dependencies.
This is useful for triggering recurring tasks, such as sending reminders or notifications.
Examples:
iex> import Journey.Node
iex> graph = Journey.new_graph(
...> "`schedule_recurring()` doctest graph (it issues 'reminders' every few seconds)",
...> "v1.0.0",
...> [
...> input(:name),
...> schedule_recurring(
...> :schedule_a_reminder,
...> [:name],
...> fn _ ->
...> soon = System.system_time(:second) + 2
...> {:ok, soon}
...> end
...> ),
...> compute(
...> :send_a_reminder,
...> [:name, :schedule_a_reminder],
...> fn %{name: name} = v ->
...> reminder_count = Map.get(v, :send_a_reminder, 0) + 1
...> IO.puts("[#{System.system_time(:second)}] #{name}, here is your scheduled reminder # #{reminder_count}.")
...> {:ok, reminder_count}
...> end
...> )
...> ]
...> )
iex> execution =
...> graph
...> |> Journey.start_execution()
...> |> Journey.set(:name, "Mario")
iex> execution |> Journey.values() |> Map.get(:name)
"Mario"
iex> # This is only needed in a test, to simulate the background processing that happens in non-tests automatically.
iex> background_sweeps_task = Journey.Scheduler.Background.Periodic.start_background_sweeps_in_test(execution.id)
iex> # Wait for initial reminders
iex> {:ok, count1, _} = Journey.get(execution, :send_a_reminder, wait: :any)
iex> count1 >= 1
true
iex> # Wait for more reminders to verify recurring behavior
iex> execution = Journey.load(execution)
iex> {:ok, count2, _} = Journey.get(execution, :send_a_reminder, wait: :newer)
iex> count2 > count1
true
iex> execution = Journey.load(execution)
iex> {:ok, count3, _} = Journey.get(execution, :send_a_reminder, wait: :newer)
iex> count3 > count2
true
iex> Journey.Scheduler.Background.Periodic.stop_background_sweeps_in_test(background_sweeps_task)
Return Values
The f_compute function must return {:ok, value} or {:error, reason}. Note that atoms
in the returned value and reason will be converted to strings when persisted.
Returning {:ok, 0} indicates that the schedule should not trigger downstream computations.