Normandy.Coordination.ParallelOrchestrator (normandy v0.2.0)

View Source

Orchestrates parallel execution of multiple agents.

The Parallel Orchestrator runs multiple agents concurrently, each working on the same or different inputs. Results are collected and can be aggregated using a custom function.

Example

# Define agents
agents = [
  %{id: "researcher_1", agent: research_agent_1, input: query1},
  %{id: "researcher_2", agent: research_agent_2, input: query2},
  %{id: "researcher_3", agent: research_agent_3, input: query3}
]

# Execute in parallel
{:ok, results} = ParallelOrchestrator.execute(
  agents,
  max_concurrency: 3,
  aggregate: &combine_research_results/1
)

Summary

Functions

Executes agents in parallel.

Executes agents with the same input in parallel.

Executes agents and collects results as they complete.

Types

agent_spec()

@type agent_spec() :: %{
  :id => String.t(),
  :agent => struct(),
  :input => term(),
  optional(:transform) => (term() -> term())
}

execution_result()

@type execution_result() :: %{
  success: boolean(),
  results: %{required(String.t()) => term()},
  errors: %{required(String.t()) => term()},
  context: Normandy.Coordination.SharedContext.t(),
  aggregated: term() | nil
}

Functions

execute(agents, input_or_opts \\ [])

@spec execute([agent_spec()] | [struct()], keyword() | map()) ::
  {:ok, execution_result()} | {:ok, list()}

Executes agents in parallel.

Can be called in two ways:

  1. With agent_specs and options: execute(agent_specs, opts)
  2. With agents list and input: execute(agents, input) - returns list of results

Options

  • :shared_context - SharedContext to use (default: new context)
  • :max_concurrency - Maximum concurrent agents (default: 10)
  • :timeout - Timeout per agent in ms (default: 300_000)
  • :aggregate - Function to aggregate results: ([{id, result}] -> term())
  • :on_agent_complete - Callback: (agent_id, result -> any)
  • :ordered - Return results in spec order (default: false)

Example

{:ok, result} = ParallelOrchestrator.execute(
  agents,
  max_concurrency: 5,
  aggregate: fn results ->
    Enum.map(results, fn {_id, r} -> r end)
    |> Enum.join("\n")
  end
)

execute_same_input(agents, input, opts \\ [])

@spec execute_same_input([%{id: String.t(), agent: struct()}], term(), keyword()) ::
  {:ok, execution_result()}

Executes agents with the same input in parallel.

All agents receive the same input and work on it concurrently.

Example

# Multiple agents analyze the same data
{:ok, results} = ParallelOrchestrator.execute_same_input(
  agents,
  input_data,
  max_concurrency: 5
)

execute_stream(agent_specs, opts \\ [])

@spec execute_stream(
  [agent_spec()],
  keyword()
) :: Enumerable.t()

Executes agents and collects results as they complete.

Returns a stream of results as agents finish.

Example

stream = ParallelOrchestrator.execute_stream(agents)

stream
|> Stream.each(fn {:ok, {agent_id, result}} ->
  IO.puts("Agent #{agent_id}: #{inspect(result)}")
end)
|> Stream.run()