Normandy.Batch.Processor (normandy v0.2.0)
View SourceBatch processing utilities for handling multiple agent requests concurrently.
Provides efficient batch processing with configurable concurrency, rate limiting, and result aggregation for AI agent operations.
Features
- Concurrent request processing with Task.async_stream
- Configurable concurrency limits
- Ordered and unordered result collection
- Error handling and partial failure support
- Progress tracking callbacks
Example
# Basic batch processing
inputs = [
%{chat_message: "Hello"},
%{chat_message: "How are you?"},
%{chat_message: "Goodbye"}
]
{:ok, results} = Normandy.Batch.Processor.process_batch(
agent,
inputs,
max_concurrency: 5
)
# With progress callback
{:ok, results} = Normandy.Batch.Processor.process_batch(
agent,
inputs,
on_progress: fn completed, total ->
IO.puts("Progress: #{completed}/#{total}")
end
)
Summary
Functions
Process a batch of inputs through an agent concurrently.
Process a batch in chunks with a delay between chunks.
Process a batch and return detailed statistics.
Types
@type batch_option() :: {:max_concurrency, pos_integer()} | {:ordered, boolean()} | {:timeout, pos_integer()} | {:on_progress, progress_callback()} | {:on_error, error_callback()}
@type batch_options() :: [batch_option()]
@type batch_result() :: %{ success: [term()], errors: [{term(), term()}], total: pos_integer(), success_count: pos_integer(), error_count: pos_integer() }
@type progress_callback() :: (pos_integer(), pos_integer() -> any())
Functions
@spec process_batch(agent :: struct(), inputs :: [term()], batch_options()) :: {:ok, [term()] | batch_result()}
Process a batch of inputs through an agent concurrently.
Options
:max_concurrency- Maximum concurrent tasks (default: 10):ordered- Preserve input order in results (default: true):timeout- Timeout per task in milliseconds (default: 300000ms):on_progress- Callback function called after each completion:(completed, total -> any):on_error- Callback function called on each error:(input, error -> any)
Returns
{:ok, results}- List of results (or batch_result map if unordered){:error, reason}- Fatal error during batch processing
Examples
# Simple batch
{:ok, results} = Processor.process_batch(agent, inputs)
# With configuration
{:ok, results} = Processor.process_batch(
agent,
inputs,
max_concurrency: 5,
on_progress: fn completed, total ->
IO.puts("Progress: #{completed}/#{total}")
end
)
@spec process_batch_chunked(agent :: struct(), inputs :: [term()], batch_options()) :: {:ok, [term()]}
Process a batch in chunks with a delay between chunks.
Useful for very large batches or strict rate limiting requirements.
Options
All options from process_batch/3 plus:
:chunk_size- Number of items per chunk (default: 100):chunk_delay- Milliseconds to wait between chunks (default: 0)
Example
{:ok, results} = Processor.process_batch_chunked(
agent,
large_input_list,
chunk_size: 50,
chunk_delay: 1000, # 1 second between chunks
max_concurrency: 5
)
@spec process_batch_with_stats(agent :: struct(), inputs :: [term()], batch_options()) :: {:ok, batch_result()}
Process a batch and return detailed statistics.
Returns a map with separate success/error lists and counts.
Example
result = Processor.process_batch_with_stats(agent, inputs)
#=> %{
success: [result1, result2],
errors: [{input3, error}],
total: 3,
success_count: 2,
error_count: 1
}