Normandy.Batch.Processor (normandy v0.2.0)

View Source

Batch processing utilities for handling multiple agent requests concurrently.

Provides efficient batch processing with configurable concurrency, rate limiting, and result aggregation for AI agent operations.

Features

  • Concurrent request processing with Task.async_stream
  • Configurable concurrency limits
  • Ordered and unordered result collection
  • Error handling and partial failure support
  • Progress tracking callbacks

Example

# Basic batch processing
inputs = [
  %{chat_message: "Hello"},
  %{chat_message: "How are you?"},
  %{chat_message: "Goodbye"}
]

{:ok, results} = Normandy.Batch.Processor.process_batch(
  agent,
  inputs,
  max_concurrency: 5
)

# With progress callback
{:ok, results} = Normandy.Batch.Processor.process_batch(
  agent,
  inputs,
  on_progress: fn completed, total ->
    IO.puts("Progress: #{completed}/#{total}")
  end
)

Summary

Functions

Process a batch of inputs through an agent concurrently.

Process a batch in chunks with a delay between chunks.

Process a batch and return detailed statistics.

Types

batch_option()

@type batch_option() ::
  {:max_concurrency, pos_integer()}
  | {:ordered, boolean()}
  | {:timeout, pos_integer()}
  | {:on_progress, progress_callback()}
  | {:on_error, error_callback()}

batch_options()

@type batch_options() :: [batch_option()]

batch_result()

@type batch_result() :: %{
  success: [term()],
  errors: [{term(), term()}],
  total: pos_integer(),
  success_count: pos_integer(),
  error_count: pos_integer()
}

error_callback()

@type error_callback() :: (term(), term() -> any())

progress_callback()

@type progress_callback() :: (pos_integer(), pos_integer() -> any())

Functions

process_batch(agent, inputs, opts \\ [])

@spec process_batch(agent :: struct(), inputs :: [term()], batch_options()) ::
  {:ok, [term()] | batch_result()}

Process a batch of inputs through an agent concurrently.

Options

  • :max_concurrency - Maximum concurrent tasks (default: 10)
  • :ordered - Preserve input order in results (default: true)
  • :timeout - Timeout per task in milliseconds (default: 300000ms)
  • :on_progress - Callback function called after each completion: (completed, total -> any)
  • :on_error - Callback function called on each error: (input, error -> any)

Returns

  • {:ok, results} - List of results (or batch_result map if unordered)
  • {:error, reason} - Fatal error during batch processing

Examples

# Simple batch
{:ok, results} = Processor.process_batch(agent, inputs)

# With configuration
{:ok, results} = Processor.process_batch(
  agent,
  inputs,
  max_concurrency: 5,
  on_progress: fn completed, total ->
    IO.puts("Progress: #{completed}/#{total}")
  end
)

process_batch_chunked(agent, inputs, opts \\ [])

@spec process_batch_chunked(agent :: struct(), inputs :: [term()], batch_options()) ::
  {:ok, [term()]}

Process a batch in chunks with a delay between chunks.

Useful for very large batches or strict rate limiting requirements.

Options

All options from process_batch/3 plus:

  • :chunk_size - Number of items per chunk (default: 100)
  • :chunk_delay - Milliseconds to wait between chunks (default: 0)

Example

{:ok, results} = Processor.process_batch_chunked(
  agent,
  large_input_list,
  chunk_size: 50,
  chunk_delay: 1000,  # 1 second between chunks
  max_concurrency: 5
)

process_batch_with_stats(agent, inputs, opts \\ [])

@spec process_batch_with_stats(agent :: struct(), inputs :: [term()], batch_options()) ::
  {:ok, batch_result()}

Process a batch and return detailed statistics.

Returns a map with separate success/error lists and counts.

Example

result = Processor.process_batch_with_stats(agent, inputs)
#=> %{
  success: [result1, result2],
  errors: [{input3, error}],
  total: 3,
  success_count: 2,
  error_count: 1
}