Normandy.Coordination.Reactive (normandy v0.2.0)

View Source

Reactive patterns for event-driven multi-agent coordination.

Provides high-level patterns for concurrent agent execution with different completion strategies: race (first to finish), all (wait for all), and some (wait for N successes).

Examples

# Race: Return first successful result
{:ok, result} = Reactive.race([agent1, agent2, agent3], input, timeout: 5000)

# All: Wait for all agents to complete
{:ok, results} = Reactive.all([agent1, agent2, agent3], input, timeout: 10000)

# Some: Wait for N successful results
{:ok, results} = Reactive.some([agent1, agent2, agent3], input, count: 2)

# With agent processes
{:ok, result} = Reactive.race([pid1, pid2], input)

Use Cases

  • Race: Get fastest response for latency-sensitive operations
  • All: Ensemble methods, need all perspectives
  • Some: Quorum-based decisions, need majority agreement

Summary

Functions

Waits for all agents to complete and returns all results.

Executes an agent and applies a transformation if successful.

Races multiple agents and returns the first successful result.

Waits for N successful results from agents.

Executes an agent and conditionally executes another based on the result.

Types

agent_or_pid()

@type agent_or_pid() :: struct() | pid()

result()

@type result() :: {:ok, term()} | {:error, term()}

Functions

all(agents, input, opts \\ [])

@spec all([agent_or_pid()], term(), keyword()) ::
  {:ok, %{required(String.t()) => result()}} | result()

Waits for all agents to complete and returns all results.

Executes all agents concurrently and waits for all to finish, collecting both successes and failures.

Options

  • :timeout - Maximum time to wait in ms (default: 60_000)
  • :max_concurrency - Maximum concurrent agents (default: length of agents)
  • :on_complete - Callback fn agent_id, result -> any end
  • :fail_fast - Stop on first failure (default: false)

Examples

# Get all results
{:ok, results} = Reactive.all([agent1, agent2, agent3], input)
#=> {:ok, %{
       "agent_0" => {:ok, result1},
       "agent_1" => {:ok, result2},
       "agent_2" => {:error, reason}
     }}

# With concurrency limit
{:ok, results} = Reactive.all(many_agents, input, max_concurrency: 5)

# Fail fast on first error
{:error, reason} = Reactive.all(agents, input, fail_fast: true)

Returns

  • {:ok, %{agent_id => result}} - Map of all results
  • {:error, reason} - If fail_fast is true and an agent fails
  • {:error, :timeout} - If timeout is reached

map(agent, input, transform_fn)

@spec map(agent_or_pid(), term(), (result() -> result())) :: result()

Executes an agent and applies a transformation if successful.

Useful for chaining operations based on agent responses.

Examples

result = Reactive.map(agent, input, fn
  {:ok, %{confidence: c}} when c > 0.8 ->
    {:ok, :high_confidence}

  {:ok, %{confidence: c}} when c < 0.5 ->
    {:ok, :low_confidence}

  {:ok, _} ->
    {:ok, :medium_confidence}

  error ->
    error
end)

race(agents, input, opts \\ [])

@spec race([agent_or_pid()], term(), keyword()) :: result()

Races multiple agents and returns the first successful result.

Starts all agents concurrently and returns as soon as any agent succeeds. Other agents continue running in the background but their results are ignored.

Options

  • :timeout - Maximum time to wait in ms (default: 60_000)
  • :on_complete - Callback fn agent_id, result -> any end

Examples

# With agent structs
{:ok, fastest_result} = Reactive.race(
  [research_agent, search_agent, cached_agent],
  query,
  timeout: 5000
)

# With agent processes
{:ok, result} = Reactive.race([pid1, pid2, pid3], input)

# With callback
Reactive.race(agents, input,
  on_complete: fn agent_id, result ->
    Logger.info("Agent #{agent_id} completed: #{inspect(result)}")
  end
)

Returns

  • {:ok, result} - First successful result
  • {:error, :all_failed} - All agents failed
  • {:error, :timeout} - Timeout reached before any success

some(agents, input, opts)

@spec some([agent_or_pid()], term(), keyword()) ::
  {:ok, %{required(String.t()) => term()}} | result()

Waits for N successful results from agents.

Executes agents concurrently until the specified number of successful results is reached. Useful for quorum-based decisions.

Options

  • :count - Number of successful results needed (required)
  • :timeout - Maximum time to wait in ms (default: 60_000)
  • :on_complete - Callback fn agent_id, result -> any end

Examples

# Wait for 2 out of 3 agents to succeed (quorum)
{:ok, results} = Reactive.some(
  [agent1, agent2, agent3],
  input,
  count: 2,
  timeout: 10000
)
#=> {:ok, %{"agent_0" => result1, "agent_2" => result3}}

# Majority vote
agents = [voter1, voter2, voter3, voter4, voter5]
{:ok, votes} = Reactive.some(agents, question, count: 3)

Returns

  • {:ok, %{agent_id => result}} - Map of N successful results
  • {:error, :insufficient_successes} - Not enough agents succeeded
  • {:error, :timeout} - Timeout reached before N successes

when_result(agent, input, list)

(macro)

Executes an agent and conditionally executes another based on the result.

Examples

Reactive.when_result(agent1, input) do
  {:ok, %{needs_review: true}} ->
    AgentProcess.run(review_agent, "Please review")

  {:ok, %{confidence: c}} when c < 0.5 ->
    AgentProcess.run(fallback_agent, "Use fallback")

  {:ok, result} ->
    {:ok, result}

  error ->
    error
end