This guide demonstrates how to process multiple LLM generation requests concurrently using ExOutlines batch processing capabilities, leveraging the BEAM's excellent concurrency model.
Table of Contents
- Why Batch Processing?
- Basic Batch Generation
- Concurrency Configuration
- Error Handling
- Performance Optimization
- Real-World Patterns
- Monitoring and Telemetry
- Best Practices
Why Batch Processing?
When you need to generate structured output for multiple prompts, processing them sequentially is inefficient:
# Sequential processing - SLOW
results = for prompt <- prompts do
ExOutlines.generate(schema, prompt: prompt, backend: HTTP, backend_opts: opts)
endProblems with sequential processing:
- Each request waits for the previous one to complete
- Total time = sum of all request times
- CPU and network resources are underutilized
- Poor throughput for high-volume applications
Batch processing advantages:
- Multiple requests execute concurrently
- Total time ≈ longest request time (not sum)
- Better resource utilization
- Higher throughput
- Built-in error handling for individual failures
Basic Batch Generation
Using generate_batch/2
alias ExOutlines.{Spec.Schema, Backend.HTTP}
# Define schema
schema = Schema.new(%{
sentiment: %{type: {:enum, ["positive", "neutral", "negative"]}, required: true},
confidence: %{type: :number, required: true, min: 0, max: 1}
})
# Prepare tasks (list of {schema, opts} tuples)
reviews = [
"This product is amazing! Highly recommend.",
"It's okay, nothing special.",
"Terrible quality, very disappointed."
]
tasks = Enum.map(reviews, fn review ->
{schema, [
backend: HTTP,
backend_opts: [
api_key: System.get_env("OPENAI_API_KEY"),
model: "gpt-4o-mini",
messages: [
%{role: "system", content: "Analyze sentiment"},
%{role: "user", content: review}
]
]
]}
end)
# Generate concurrently
results = ExOutlines.generate_batch(tasks)
# Process results
Enum.each(results, fn
{:ok, data} ->
IO.puts("Sentiment: #{data.sentiment}, Confidence: #{data.confidence}")
{:error, reason} ->
IO.puts("Failed: #{inspect(reason)}")
end)Return Value
generate_batch/2 returns a list of results matching the input order:
[
{:ok, %{sentiment: "positive", confidence: 0.95}},
{:ok, %{sentiment: "neutral", confidence: 0.80}},
{:error, :timeout}
]Concurrency Configuration
Max Concurrency
Control how many requests run simultaneously:
# Default: number of CPU cores
results = ExOutlines.generate_batch(tasks)
# Limit to 5 concurrent requests
results = ExOutlines.generate_batch(tasks, max_concurrency: 5)
# High concurrency (if API allows)
results = ExOutlines.generate_batch(tasks, max_concurrency: 20)Choosing concurrency level:
- API rate limits: Check provider's concurrent request limits
- Memory: Each concurrent request consumes memory
- Network: Consider bandwidth and connection limits
- Cost: More concurrency = faster but potentially higher costs
Timeout Configuration
Set timeout per task:
# Default: 60 seconds per task
results = ExOutlines.generate_batch(tasks, timeout: 60_000)
# Longer timeout for complex generation
results = ExOutlines.generate_batch(tasks, timeout: 120_000)
# Short timeout for simple tasks
results = ExOutlines.generate_batch(tasks, timeout: 30_000)Ordered vs Unordered Results
# Ordered results (default) - preserves input order
results = ExOutlines.generate_batch(tasks, ordered: true)
# Unordered results - slightly faster, results arrive as completed
results = ExOutlines.generate_batch(tasks, ordered: false)Use ordered when: Result position matters (matching input indices) Use unordered when: Processing results independently and speed matters
Error Handling
Mixed Success and Failure
Batch processing continues even when individual tasks fail:
results = ExOutlines.generate_batch(tasks)
# Separate successes and failures
{successes, failures} = Enum.split_with(results, fn
{:ok, _} -> true
{:error, _} -> false
end)
IO.puts("Successful: #{length(successes)}/#{length(results)}")
IO.puts("Failed: #{length(failures)}/#{length(results)}")
# Process failures
Enum.each(failures, fn {:error, reason} ->
Logger.error("Task failed: #{inspect(reason)}")
end)Retry Failed Tasks
defmodule BatchProcessor do
def process_with_retry(tasks, max_retries \\ 3) do
results = ExOutlines.generate_batch(tasks)
# Find failed tasks
failed_indices = results
|> Enum.with_index()
|> Enum.filter(fn {{status, _}, _idx} -> status == :error end)
|> Enum.map(fn {_, idx} -> idx end)
if length(failed_indices) > 0 and max_retries > 0 do
IO.puts("Retrying #{length(failed_indices)} failed tasks...")
# Retry only failed tasks
retry_tasks = Enum.map(failed_indices, fn idx -> Enum.at(tasks, idx) end)
retry_results = process_with_retry(retry_tasks, max_retries - 1)
# Merge retry results back
merge_results(results, failed_indices, retry_results)
else
results
end
end
defp merge_results(original, indices, retries) do
Enum.reduce(Enum.zip(indices, retries), original, fn {idx, result}, acc ->
List.replace_at(acc, idx, result)
end)
end
end
# Use with retry
results = BatchProcessor.process_with_retry(tasks, max_retries: 2)Timeout Handling
results = ExOutlines.generate_batch(tasks,
timeout: 30_000,
on_timeout: :kill_task # Default - kill timed-out tasks
)
# Or continue without killing
results = ExOutlines.generate_batch(tasks,
timeout: 30_000,
on_timeout: :continue # Let tasks complete in background
)Performance Optimization
Optimal Batch Size
Don't batch everything at once. Break large workloads into chunks:
defmodule BatchOptimizer do
@chunk_size 50
@max_concurrency 10
def process_large_dataset(items, schema, backend_opts) do
items
|> Enum.chunk_every(@chunk_size)
|> Enum.flat_map(fn chunk ->
tasks = build_tasks(chunk, schema, backend_opts)
ExOutlines.generate_batch(tasks, max_concurrency: @max_concurrency)
end)
end
defp build_tasks(items, schema, backend_opts) do
Enum.map(items, fn item ->
{schema, [backend: HTTP, backend_opts: backend_opts]}
end)
end
endWhy chunk?
- Prevents memory exhaustion with thousands of tasks
- Provides progress feedback
- Easier to handle partial failures
- Better rate limit management
Progress Tracking
defmodule ProgressTracker do
def process_with_progress(items, schema, backend_opts) do
total = length(items)
chunk_size = 50
items
|> Enum.chunk_every(chunk_size)
|> Enum.with_index(1)
|> Enum.flat_map(fn {chunk, batch_num} ->
IO.write("\rProcessing batch #{batch_num}/#{div(total, chunk_size) + 1}...")
tasks = build_tasks(chunk, schema, backend_opts)
results = ExOutlines.generate_batch(tasks, max_concurrency: 10)
success_count = Enum.count(results, fn {status, _} -> status == :ok end)
IO.puts(" #{success_count}/#{length(chunk)} succeeded")
results
end)
end
defp build_tasks(items, schema, backend_opts) do
# Build task list
end
endCaching for Repeated Inputs
defmodule CachedBatchProcessor do
def process_with_cache(items, schema, backend_opts) do
# Deduplicate items
unique_items = Enum.uniq(items)
# Process unique items
tasks = build_tasks(unique_items, schema, backend_opts)
results = ExOutlines.generate_batch(tasks)
# Build cache
cache = Enum.zip(unique_items, results) |> Map.new()
# Map results back to original items
Enum.map(items, fn item -> Map.get(cache, item) end)
end
defp build_tasks(items, schema, backend_opts) do
# Build task list
end
endReal-World Patterns
Content Moderation at Scale
defmodule ContentModerator do
alias ExOutlines.{Spec.Schema, Backend.HTTP}
@moderation_schema Schema.new(%{
is_safe: %{type: :boolean, required: true},
category: %{
type: {:enum, ["safe", "spam", "hate_speech", "violence", "explicit"]},
required: true
},
confidence: %{type: :number, required: true, min: 0, max: 1}
})
def moderate_comments(comments, api_key) do
backend_opts = [
api_key: api_key,
model: "gpt-4o-mini",
temperature: 0.0
]
# Process in batches of 100
comments
|> Enum.chunk_every(100)
|> Enum.flat_map(fn batch ->
moderate_batch(batch, backend_opts)
end)
end
defp moderate_batch(comments, backend_opts) do
tasks = Enum.map(comments, fn comment ->
{@moderation_schema, [
backend: HTTP,
backend_opts: backend_opts ++
[messages: [
%{role: "system", content: "You are a content moderator."},
%{role: "user", content: "Moderate this: #{comment}"}
]]
]}
end)
ExOutlines.generate_batch(tasks, max_concurrency: 20)
end
end
# Usage
comments = load_comments() # Thousands of comments
results = ContentModerator.moderate_comments(comments, api_key)
# Filter unsafe content
unsafe_comments = results
|> Enum.zip(comments)
|> Enum.filter(fn {{:ok, moderation}, _comment} -> !moderation.is_safe end)
|> Enum.map(fn {_result, comment} -> comment end)Product Categorization Pipeline
defmodule ProductCategorizer do
alias ExOutlines.{Spec.Schema, Backend.HTTP}
@category_schema Schema.new(%{
category: %{
type: {:enum, ["electronics", "clothing", "home", "sports", "toys"]},
required: true
},
subcategory: %{type: :string, required: true},
tags: %{
type: {:array, %{type: :string}},
required: true,
min_items: 1,
max_items: 5,
unique_items: true
}
})
def categorize_products(products, api_key) do
backend_opts = [api_key: api_key, model: "gpt-4o-mini"]
tasks = Enum.map(products, fn product ->
{@category_schema, [
backend: HTTP,
backend_opts: backend_opts ++
[messages: [
%{role: "system", content: "Categorize products."},
%{role: "user", content: product.description}
]]
]}
end)
results = ExOutlines.generate_batch(tasks, max_concurrency: 10)
# Merge results back with products
Enum.zip(products, results)
|> Enum.map(fn {product, result} ->
case result do
{:ok, categorization} ->
Map.merge(product, categorization)
{:error, _reason} ->
Map.put(product, :categorization_error, true)
end
end)
end
endA/B Testing Different Prompts
defmodule PromptTester do
def test_prompts(test_cases, prompt_variants, schema, backend_opts) do
# Create tasks for all combinations
tasks = for test_case <- test_cases,
prompt_variant <- prompt_variants do
messages = [
%{role: "system", content: prompt_variant.system_prompt},
%{role: "user", content: test_case.input}
]
{schema, [
backend: HTTP,
backend_opts: backend_opts ++ [messages: messages],
metadata: %{
test_case: test_case.id,
prompt_variant: prompt_variant.name
}
]}
end
# Run all tests concurrently
results = ExOutlines.generate_batch(tasks, max_concurrency: 20)
# Analyze results by variant
analyze_by_variant(results, test_cases, prompt_variants)
end
defp analyze_by_variant(results, test_cases, variants) do
# Group and analyze results
# Calculate success rates, accuracy, etc.
end
endMonitoring and Telemetry
Batch Processing Telemetry
ExOutlines emits telemetry events for batch processing:
:telemetry.attach(
"batch-processing-logger",
[:ex_outlines, :batch, :start],
fn _event, measurements, metadata, _config ->
IO.puts("Starting batch: #{metadata.total_tasks} tasks, concurrency: #{metadata.max_concurrency}")
end,
nil
)
:telemetry.attach(
"batch-processing-complete",
[:ex_outlines, :batch, :stop],
fn _event, measurements, metadata, _config ->
duration_sec = measurements.duration / 1_000_000_000
success_rate = metadata.successes / metadata.total_tasks * 100
IO.puts("""
Batch complete:
- Duration: #{Float.round(duration_sec, 2)}s
- Success rate: #{Float.round(success_rate, 1)}%
- Throughput: #{Float.round(metadata.total_tasks / duration_sec, 2)} tasks/sec
""")
end,
nil
)Custom Metrics
defmodule BatchMetrics do
def track_batch_performance(tasks) do
start_time = System.monotonic_time()
results = ExOutlines.generate_batch(tasks, max_concurrency: 10)
end_time = System.monotonic_time()
duration_ms = System.convert_time_unit(end_time - start_time, :native, :millisecond)
metrics = %{
total_tasks: length(tasks),
successes: Enum.count(results, fn {status, _} -> status == :ok end),
failures: Enum.count(results, fn {status, _} -> status == :error end),
duration_ms: duration_ms,
throughput: length(tasks) / (duration_ms / 1000)
}
# Send to monitoring system
send_metrics(metrics)
results
end
defp send_metrics(metrics) do
# Send to StatsD, Prometheus, etc.
end
endBest Practices
1. Respect API Rate Limits
# Check provider's rate limits
# OpenAI: 3,500 requests/min (tier 1) = ~58/sec
# Anthropic: 50 requests/min (free tier) = ~0.8/sec
# Adjust concurrency accordingly
results = ExOutlines.generate_batch(tasks,
max_concurrency: 10 # Stay under rate limit
)2. Use Appropriate Timeouts
# Simple classification: short timeout
results = ExOutlines.generate_batch(classification_tasks, timeout: 10_000)
# Complex generation: longer timeout
results = ExOutlines.generate_batch(writing_tasks, timeout: 60_000)
# Very complex reasoning: very long timeout
results = ExOutlines.generate_batch(reasoning_tasks, timeout: 120_000)3. Handle Partial Failures Gracefully
results = ExOutlines.generate_batch(tasks)
{successes, failures} = Enum.split_with(results, fn
{:ok, _} -> true
{:error, _} -> false
end)
if length(failures) > 0 do
Logger.warning("#{length(failures)} tasks failed, continuing with successes")
# Decide: retry, skip, or use fallback
end
# Process successes
Enum.each(successes, fn {:ok, data} ->
process_result(data)
end)4. Chunk Large Workloads
# Process 10,000 items
large_dataset
|> Enum.chunk_every(100) # Batches of 100
|> Enum.each(fn chunk ->
tasks = build_tasks(chunk)
results = ExOutlines.generate_batch(tasks, max_concurrency: 10)
save_results(results)
end)5. Monitor Performance
# Track key metrics
defmodule BatchMonitor do
def process_with_monitoring(tasks) do
:timer.tc(fn ->
ExOutlines.generate_batch(tasks, max_concurrency: 10)
end)
|> case do
{time_microseconds, results} ->
log_performance(time_microseconds, results)
results
end
end
defp log_performance(time_us, results) do
success_rate = Enum.count(results, fn {s, _} -> s == :ok end) / length(results)
Logger.info("""
Batch completed:
- Time: #{time_us / 1_000_000}s
- Tasks: #{length(results)}
- Success: #{Float.round(success_rate * 100, 1)}%
""")
end
end6. Use Background Jobs for Large Batches
# Don't block HTTP requests with large batch processing
defmodule MyApp.BatchWorker do
use Oban.Worker, queue: :batch_processing
@impl Oban.Worker
def perform(%Oban.Job{args: %{"items" => items}}) do
tasks = build_tasks(items)
results = ExOutlines.generate_batch(tasks,
max_concurrency: 10,
timeout: 60_000
)
save_results(results)
:ok
end
end
# Enqueue from controller
def batch_process(conn, %{"items" => items}) do
%{items: items}
|> MyApp.BatchWorker.new()
|> Oban.insert()
json(conn, %{status: "processing", job_id: job.id})
end7. Test with Mock Backend First
defmodule BatchProcessorTest do
use ExUnit.Case
alias ExOutlines.Backend.Mock
test "processes batch successfully" do
schema = Schema.new(%{value: %{type: :integer}})
# Create mock responses
mock = Mock.new([
{:ok, ~s({"value": 1})},
{:ok, ~s({"value": 2})},
{:ok, ~s({"value": 3})}
])
tasks = for _i <- 1..3 do
{schema, [backend: Mock, backend_opts: [mock: mock]]}
end
results = ExOutlines.generate_batch(tasks, max_concurrency: 2)
assert length(results) == 3
assert Enum.all?(results, fn {status, _} -> status == :ok end)
end
endPerformance Comparison
Sequential vs Batch Processing
# Benchmark script
defmodule BatchBenchmark do
def run do
schema = Schema.new(%{result: %{type: :integer}})
count = 20
# Sequential
sequential_time = measure(fn ->
for _i <- 1..count do
ExOutlines.generate(schema, backend: Mock, backend_opts: [mock: mock()])
end
end)
# Batch
batch_time = measure(fn ->
tasks = for _i <- 1..count, do: {schema, [backend: Mock, backend_opts: [mock: mock()]]}
ExOutlines.generate_batch(tasks, max_concurrency: 10)
end)
IO.puts("""
Results for #{count} tasks:
- Sequential: #{sequential_time}ms
- Batch (concurrency: 10): #{batch_time}ms
- Speedup: #{Float.round(sequential_time / batch_time, 2)}x
""")
end
defp measure(fun) do
{time, _result} = :timer.tc(fun)
div(time, 1000) # Convert to ms
end
defp mock do
Mock.new([{:ok, ~s({"result": 42})}])
end
endExpected results: 5-10x speedup depending on concurrency and task complexity.
Next Steps
- Read the Performance Optimization guide for advanced tuning
- See Testing Strategies for testing batch operations
- Explore Phoenix Integration for web application patterns
- Check Telemetry documentation for monitoring setup