AgentForge.Flow (AgentForge v0.2.2)

View Source

Provides functions for processing signals through a chain of handlers. Each handler is a function that takes a signal and state, and returns a tuple with result and new state.

Signal Processing

The Flow module supports flexible signal processing patterns through various options:

Signal Strategies

Control how signals flow through your handlers:

  • :forward (default) - Passes signals unchanged to next handler
  • :transform - Modifies signals before passing to next handler using transform_fn
  • :restart - Allows handlers to restart the flow with a new signal

Skip Handling

  • continue_on_skip: false (default) - A handler returning :skip will halt the chain
  • continue_on_skip: true - Processing continues to next handler even after a skip

Branching

Handlers can implement conditional logic using the branch format: {:branch, condition, true_state, false_state}

Execution Limits

  • max_steps - Maximum number of steps to execute (prevents infinite loops)
  • timeout_ms - Maximum execution time in milliseconds

See the examples directory for complete demonstrations of these features.

Summary

Types

A flow is a handler function or a list of handler functions. Each handler takes a signal and state, and returns a tuple with result and new state.

Functions

Creates a handler that always emits a signal of the given type and data.

Creates a handler that only processes signals of a specific type. Other signal types are skipped.

Processes a signal through a list of handlers. For backward compatibility, does not return statistics by default.

Process a signal using a function flow.

Process a single handler with a signal and state. This is the core function that executes a single handler.

Processes a signal through a list of handlers with execution limits. Supports timeout to prevent long-running processes.

Creates a handler that stores the signal data in state under the given key.

Types

flow()

@type flow() ::
  (AgentForge.Signal.t(), map() -> {term(), map()})
  | [(AgentForge.Signal.t(), map() -> {term(), map()})]

A flow is a handler function or a list of handler functions. Each handler takes a signal and state, and returns a tuple with result and new state.

Functions

always_emit(type, data)

Creates a handler that always emits a signal of the given type and data.

Examples

iex> handler = AgentForge.Flow.always_emit(:done, "success")
iex> {result, state} = handler.(nil, %{})
iex> match?({:emit, %{type: :done, data: "success"}}, result)
true

filter_type(type, handler)

Creates a handler that only processes signals of a specific type. Other signal types are skipped.

Examples

iex> inner = fn signal, state -> {AgentForge.Signal.emit(:processed, signal.data), state} end
iex> handler = AgentForge.Flow.filter_type(:test, inner)
iex> test_signal = AgentForge.Signal.new(:test, "data")
iex> {result, _} = handler.(test_signal, %{})
iex> match?({:emit, %{type: :processed}}, result)
true
iex> other_signal = AgentForge.Signal.new(:other, "data")
iex> handler.(other_signal, %{}) |> elem(0)
:skip

get_last_execution_stats()

process(handlers, signal, state)

Processes a signal through a list of handlers. For backward compatibility, does not return statistics by default.

Examples

iex> handlers = [
...>   fn sig, st -> {{:emit, AgentForge.Signal.new(:echo, sig.data)}, st} end
...> ]
iex> signal = AgentForge.Signal.new(:test, "data")
iex> {:ok, result, _} = AgentForge.Flow.process(handlers, signal, %{})
iex> result.type
:echo

process_function_flow(flow_fn, signal, state, opts \\ [])

Process a signal using a function flow.

This allows defining a workflow as a function instead of a list of handlers. The function should accept a signal and state, and return one of:

  • {:emit, new_signal, new_state} - Emit a new signal and continue processing
  • {:skip, new_state} - Skip processing this signal
  • {:halt, result, new_state} - Halt processing with result
  • {:error, reason, new_state} - Halt with an error

Options

Same as process_with_limits/4.

Example

function_flow = fn signal, state ->
  case signal.type do
    :start -> {:emit, Signal.new(:processing, signal.data), state}
    :processing -> {:halt, "Done processing", state}
    _ -> {:skip, state}
  end
end

Flow.process_function_flow(function_flow, signal, state)

process_handler(handler, signal, state)

Process a single handler with a signal and state. This is the core function that executes a single handler.

Examples

iex> handler = fn sig, st -> {{:emit, AgentForge.Signal.new(:echo, sig.data)}, st} end
iex> signal = AgentForge.Signal.new(:test, "data")
iex> result = AgentForge.Flow.process_handler(handler, signal, %{})
iex> match?({_, _}, result)
true

process_with_limits(handlers, signal, state, opts \\ [])

Processes a signal through a list of handlers with execution limits. Supports timeout to prevent long-running processes.

Options

  • :timeout_ms - Maximum time in milliseconds to process (default: 30000)
  • :collect_stats - Whether to collect execution statistics (default: true)
  • :return_stats - Whether to return statistics in the result (default: false)
  • :max_steps - Maximum number of steps to execute (default: 100, prevents infinite loops)
  • :continue_on_skip - Whether to continue processing after a skip result (default: false)
  • :signal_strategy - How to handle emitted signals:
    • :forward (default) - Passes emitted signals unchanged to the next handler
    • :transform - Transforms signals using the transform_fn before passing to next handler
    • :restart - Allows handlers to restart the flow from the beginning with a new signal
  • :transform_fn - Function to transform signals when signal_strategy is :transform

Signal Strategies

Forward Strategy (Default)

# Signal passes unchanged to the next handler
Flow.process_with_limits(handlers, signal, state, signal_strategy: :forward)

Transform Strategy

# Signal is transformed before passing to the next handler
transform_fn = fn signal -> Map.put(signal, :data, String.upcase(signal.data)) end
Flow.process_with_limits(handlers, signal, state, 
  signal_strategy: :transform,
  transform_fn: transform_fn
)

Restart Strategy

# When a handler emits a signal, processing restarts from the first handler
# Useful for looping through handlers until a condition is met
Flow.process_with_limits(handlers, signal, state, 
  signal_strategy: :restart,
  max_steps: 10  # Always set max_steps to prevent infinite loops
)

Basic Examples

iex> handlers = [
...>   fn sig, st -> {{:emit, AgentForge.Signal.new(:echo, sig.data)}, st} end
...> ]
iex> signal = AgentForge.Signal.new(:test, "data")
iex> {:ok, result, _} = AgentForge.Flow.process_with_limits(handlers, signal, %{})
iex> result.type
:echo

With statistics:

iex> handlers = [
...>   fn sig, st -> {{:emit, AgentForge.Signal.new(:echo, sig.data)}, st} end
...> ]
iex> signal = AgentForge.Signal.new(:test, "data")
iex> {:ok, _result, _, stats} = AgentForge.Flow.process_with_limits(handlers, signal, %{}, return_stats: true)
iex> stats.steps >= 1
true

store_in_state(key)

Creates a handler that stores the signal data in state under the given key.

Examples

iex> handler = AgentForge.Flow.store_in_state(:last_message)
iex> signal = AgentForge.Signal.new(:test, "data")
iex> {result, state} = handler.(signal, %{})
iex> result
:skip
iex> state.last_message
"data"