Extending AgentForge Primitives
View SourceThis guide explains how to create custom primitives for AgentForge.
Primitive Structure
A primitive is a function that returns a handler function with the signature:
(signal, state) -> {result, new_state}
Creating Custom Primitives
Basic Template
def my_primitive(options) do
fn signal, state ->
# Process signal and state
{{:emit, new_signal}, new_state}
end
end
Result Types
Your primitive should return one of these result types:
{{:emit, signal}, new_state} # Emit single signal
{{:emit_many, signals}, new_state} # Emit multiple signals
{{:halt, result}, new_state} # Stop processing
{{:wait, reason}, new_state} # Pause processing
{{:error, reason}, state} # Error condition
{:skip, new_state} # Skip further processing
Example: Rate Limiter Primitive
Here's a complete example of a custom primitive that implements rate limiting:
defmodule MyPrimitives do
alias AgentForge.{Signal}
@doc """
Creates a rate limiter primitive that controls signal processing rate.
## Options
* `:limit` - Maximum number of signals per interval
* `:interval` - Time interval in milliseconds
## Examples
iex> limiter = rate_limit(limit: 2, interval: 1000)
iex> signal = Signal.new(:test, "data")
iex> {{:emit, result}, state} = limiter.(signal, %{})
"""
def rate_limit(opts) do
limit = Keyword.get(opts, :limit, 10)
interval = Keyword.get(opts, :interval, 1000)
fn signal, state ->
current_time = System.monotonic_time(:millisecond)
window_start = Map.get(state, :window_start, current_time)
count = Map.get(state, :count, 0)
cond do
# New window
current_time - window_start > interval ->
new_state = %{
window_start: current_time,
count: 1
}
{{:emit, signal}, new_state}
# Within limit
count < limit ->
new_state = %{
window_start: window_start,
count: count + 1
}
{{:emit, signal}, new_state}
# Rate limit exceeded
true ->
wait_time = window_start + interval - current_time
{{:wait, "Rate limit exceeded, retry in #{wait_time}ms"}, state}
end
end
end
end
Testing Custom Primitives
Follow these testing patterns:
defmodule MyPrimitivesTest do
use ExUnit.Case
alias AgentForge.Signal
describe "rate_limit/1" do
test "allows signals within limit" do
limiter = MyPrimitives.rate_limit(limit: 2, interval: 1000)
signal = Signal.new(:test, "data")
state = %{}
# First signal
{{:emit, result1}, state1} = limiter.(signal, state)
assert result1.type == :test
# Second signal
{{:emit, result2}, _state2} = limiter.(signal, state1)
assert result2.type == :test
end
test "rate limits excessive signals" do
limiter = MyPrimitives.rate_limit(limit: 1, interval: 1000)
signal = Signal.new(:test, "data")
state = %{}
{{:emit, _}, state1} = limiter.(signal, state)
{{:wait, reason}, _} = limiter.(signal, state1)
assert reason =~ "Rate limit exceeded"
end
end
end
Best Practices
Documentation
- Use
@moduledoc
and@doc
with examples - Document all options
- Include usage patterns
- Use
Options Handling
- Use
Keyword.get/3
for optional parameters - Validate options early
- Provide sensible defaults
- Use
State Management
- Keep state immutable
- Document state structure
- Clean up state when needed
Error Handling
- Use clear error messages
- Return
{:error, reason}
for recoverable errors - Raise only for programming errors
Testing
- Test happy paths
- Test error conditions
- Test state transitions
- Test edge cases
Integration
To use your custom primitives:
defmodule MyWorkflow do
alias AgentForge.{Flow, Signal}
alias MyPrimitives
def run do
# Create rate-limited workflow
workflow = [
MyPrimitives.rate_limit(limit: 10, interval: 1000),
&process_data/2
]
# Execute
signal = Signal.new(:start, "data")
Flow.process(workflow, signal, %{})
end
defp process_data(signal, state) do
# ... processing logic ...
end
end
Advanced Patterns
Composing Primitives
def rate_limited_transform(transform_fn, rate_opts) do
fn signal, state ->
limiter = rate_limit(rate_opts)
transformer = transform(transform_fn)
case limiter.(signal, state) do
{{:emit, limited_signal}, new_state} ->
transformer.(limited_signal, new_state)
other -> other
end
end
end
Stateful Primitives
def with_cache(ttl_ms) do
fn signal, state ->
cache = Map.get(state, :cache, %{})
key = "#{signal.type}:#{inspect(signal.data)}"
current_time = System.monotonic_time(:millisecond)
case cache do
%{^key => {value, expires_at}} when current_time < expires_at ->
{{:emit, Signal.new(:cached, value)}, state}
_ ->
# Process and cache
new_cache = Map.put(cache, key, {
signal.data,
current_time + ttl_ms
})
{{:emit, signal}, Map.put(state, :cache, new_cache)}
end
end
end
Next Steps
- Review core primitives source code for patterns
- Contribute primitives back to the community
- Share your implementations in discussions