AgentForge.Flow (AgentForge v0.2.2)
View SourceProvides functions for processing signals through a chain of handlers. Each handler is a function that takes a signal and state, and returns a tuple with result and new state.
Signal Processing
The Flow module supports flexible signal processing patterns through various options:
Signal Strategies
Control how signals flow through your handlers:
:forward
(default) - Passes signals unchanged to next handler:transform
- Modifies signals before passing to next handler usingtransform_fn
:restart
- Allows handlers to restart the flow with a new signal
Skip Handling
continue_on_skip: false
(default) - A handler returning:skip
will halt the chaincontinue_on_skip: true
- Processing continues to next handler even after a skip
Branching
Handlers can implement conditional logic using the branch format:
{:branch, condition, true_state, false_state}
Execution Limits
max_steps
- Maximum number of steps to execute (prevents infinite loops)timeout_ms
- Maximum execution time in milliseconds
See the examples directory for complete demonstrations of these features.
Summary
Types
A flow is a handler function or a list of handler functions. Each handler takes a signal and state, and returns a tuple with result and new state.
Functions
Creates a handler that always emits a signal of the given type and data.
Creates a handler that only processes signals of a specific type. Other signal types are skipped.
Processes a signal through a list of handlers. For backward compatibility, does not return statistics by default.
Process a signal using a function flow.
Process a single handler with a signal and state. This is the core function that executes a single handler.
Processes a signal through a list of handlers with execution limits. Supports timeout to prevent long-running processes.
Creates a handler that stores the signal data in state under the given key.
Types
@type flow() :: (AgentForge.Signal.t(), map() -> {term(), map()}) | [(AgentForge.Signal.t(), map() -> {term(), map()})]
A flow is a handler function or a list of handler functions. Each handler takes a signal and state, and returns a tuple with result and new state.
Functions
Creates a handler that always emits a signal of the given type and data.
Examples
iex> handler = AgentForge.Flow.always_emit(:done, "success")
iex> {result, state} = handler.(nil, %{})
iex> match?({:emit, %{type: :done, data: "success"}}, result)
true
Creates a handler that only processes signals of a specific type. Other signal types are skipped.
Examples
iex> inner = fn signal, state -> {AgentForge.Signal.emit(:processed, signal.data), state} end
iex> handler = AgentForge.Flow.filter_type(:test, inner)
iex> test_signal = AgentForge.Signal.new(:test, "data")
iex> {result, _} = handler.(test_signal, %{})
iex> match?({:emit, %{type: :processed}}, result)
true
iex> other_signal = AgentForge.Signal.new(:other, "data")
iex> handler.(other_signal, %{}) |> elem(0)
:skip
Processes a signal through a list of handlers. For backward compatibility, does not return statistics by default.
Examples
iex> handlers = [
...> fn sig, st -> {{:emit, AgentForge.Signal.new(:echo, sig.data)}, st} end
...> ]
iex> signal = AgentForge.Signal.new(:test, "data")
iex> {:ok, result, _} = AgentForge.Flow.process(handlers, signal, %{})
iex> result.type
:echo
Process a signal using a function flow.
This allows defining a workflow as a function instead of a list of handlers. The function should accept a signal and state, and return one of:
{:emit, new_signal, new_state}
- Emit a new signal and continue processing{:skip, new_state}
- Skip processing this signal{:halt, result, new_state}
- Halt processing with result{:error, reason, new_state}
- Halt with an error
Options
Same as process_with_limits/4
.
Example
function_flow = fn signal, state ->
case signal.type do
:start -> {:emit, Signal.new(:processing, signal.data), state}
:processing -> {:halt, "Done processing", state}
_ -> {:skip, state}
end
end
Flow.process_function_flow(function_flow, signal, state)
Process a single handler with a signal and state. This is the core function that executes a single handler.
Examples
iex> handler = fn sig, st -> {{:emit, AgentForge.Signal.new(:echo, sig.data)}, st} end
iex> signal = AgentForge.Signal.new(:test, "data")
iex> result = AgentForge.Flow.process_handler(handler, signal, %{})
iex> match?({_, _}, result)
true
Processes a signal through a list of handlers with execution limits. Supports timeout to prevent long-running processes.
Options
:timeout_ms
- Maximum time in milliseconds to process (default: 30000):collect_stats
- Whether to collect execution statistics (default: true):return_stats
- Whether to return statistics in the result (default: false):max_steps
- Maximum number of steps to execute (default: 100, prevents infinite loops):continue_on_skip
- Whether to continue processing after a skip result (default: false):signal_strategy
- How to handle emitted signals::forward
(default) - Passes emitted signals unchanged to the next handler:transform
- Transforms signals using thetransform_fn
before passing to next handler:restart
- Allows handlers to restart the flow from the beginning with a new signal
:transform_fn
- Function to transform signals when signal_strategy is:transform
Signal Strategies
Forward Strategy (Default)
# Signal passes unchanged to the next handler
Flow.process_with_limits(handlers, signal, state, signal_strategy: :forward)
Transform Strategy
# Signal is transformed before passing to the next handler
transform_fn = fn signal -> Map.put(signal, :data, String.upcase(signal.data)) end
Flow.process_with_limits(handlers, signal, state,
signal_strategy: :transform,
transform_fn: transform_fn
)
Restart Strategy
# When a handler emits a signal, processing restarts from the first handler
# Useful for looping through handlers until a condition is met
Flow.process_with_limits(handlers, signal, state,
signal_strategy: :restart,
max_steps: 10 # Always set max_steps to prevent infinite loops
)
Basic Examples
iex> handlers = [
...> fn sig, st -> {{:emit, AgentForge.Signal.new(:echo, sig.data)}, st} end
...> ]
iex> signal = AgentForge.Signal.new(:test, "data")
iex> {:ok, result, _} = AgentForge.Flow.process_with_limits(handlers, signal, %{})
iex> result.type
:echo
With statistics:
iex> handlers = [
...> fn sig, st -> {{:emit, AgentForge.Signal.new(:echo, sig.data)}, st} end
...> ]
iex> signal = AgentForge.Signal.new(:test, "data")
iex> {:ok, _result, _, stats} = AgentForge.Flow.process_with_limits(handlers, signal, %{}, return_stats: true)
iex> stats.steps >= 1
true
Creates a handler that stores the signal data in state under the given key.
Examples
iex> handler = AgentForge.Flow.store_in_state(:last_message)
iex> signal = AgentForge.Signal.new(:test, "data")
iex> {result, state} = handler.(signal, %{})
iex> result
:skip
iex> state.last_message
"data"