Async Batch Embeddings - Production Guide
View SourceComplete guide to production-scale embedding generation with 50% cost savings
Table of Contents
- Overview
- When to Use
- Cost Analysis
- Quick Start
- Complete Workflow
- Production Patterns
- API Reference
- Error Handling
- Performance Tuning
- Best Practices
Overview
The Async Batch Embedding API allows you to process large-scale embedding jobs asynchronously with 50% cost savings compared to the interactive embedding API. It's designed for production scenarios where you need to embed thousands to millions of texts for RAG systems, knowledge bases, and large-scale retrieval.
Key Features
- 50% Cost Reduction: Half the cost per embedding vs interactive API
- Long-Running Operations (LRO): Submit job and retrieve results later
- Progress Tracking: Real-time statistics on success, failure, and pending requests
- Priority Support: Control processing order with priority field
- Multi-auth Compatible: Works with both Gemini API and Vertex AI
- Type-safe: Complete type annotations and error handling
Architecture
Submit Batch → [PENDING] → [PROCESSING] → [COMPLETED]
↘ [FAILED]
↘ [CANCELLED]The batch progresses through states, allowing you to track progress and retrieve results when complete.
When to Use
Use Async Batch API For:
✅ Large-scale indexing (1000s-millions of documents) ✅ RAG system setup (building knowledge base indices) ✅ Non-urgent embedding generation (background processing) ✅ Cost-sensitive workflows (50% savings adds up at scale) ✅ Batch data migration (moving to new embedding model)
Use Interactive API For:
❌ Real-time embedding (user-facing features) ❌ Small batches (<100 texts typically faster with interactive) ❌ Time-critical workflows (need immediate results) ❌ Interactive exploration (rapid iteration and testing)
Cost Analysis
Cost Comparison (Relative Units)
| Documents | Interactive API | Async Batch API | Savings |
|---|---|---|---|
| 1,000 | 1,000 | 500 | 500 |
| 10,000 | 10,000 | 5,000 | 5,000 |
| 100,000 | 100,000 | 50,000 | 50,000 |
| 1,000,000 | 1,000,000 | 500,000 | 500,000 |
Break-even Analysis
For typical workflows:
- Setup time: ~2-5 minutes additional for batch workflow
- Cost savings: 50% per embedding
- Break-even: ~100-200 documents (depends on workflow)
Recommendation: Use async batch for any job >500 documents or when time is not critical.
Quick Start
Basic Example
# 1. Submit batch
{:ok, batch} = Gemini.async_batch_embed_contents(
["Text 1", "Text 2", "Text 3"],
display_name: "My Batch",
task_type: :retrieval_document,
output_dimensionality: 768
)
# 2. Wait for completion
{:ok, completed_batch} = Gemini.await_batch_completion(batch.name)
# 3. Retrieve embeddings
{:ok, embeddings} = Gemini.get_batch_embeddings(completed_batch)Run Demo
# Set API key
export GEMINI_API_KEY='your-key-here'
# Run comprehensive demo
mix run examples/async_batch_embedding_demo.exs
# Run production patterns demo
mix run examples/async_batch_production_demo.exs
Complete Workflow
Step 1: Submit Batch Job
{:ok, batch} = Gemini.async_batch_embed_contents(
texts,
display_name: "Knowledge Base Index - #{timestamp}",
task_type: :retrieval_document,
output_dimensionality: 768,
priority: 5 # Higher = more urgent
)
# Save batch.name for later retrieval
batch_id = batch.name
# => "batches/abc123def456..."Key Points:
display_nameis required - use descriptive names for trackingtask_typeoptimizes embeddings for specific use casesoutput_dimensionalitydefaults to model default (typically 3072)prioritycontrols processing order (default: 0)
Step 2: Poll for Status
Option A: Active Polling with Progress
{:ok, completed_batch} = Gemini.await_batch_completion(
batch_id,
poll_interval: 10_000, # Poll every 10 seconds
timeout: 1_800_000, # 30 minute timeout
on_progress: fn updated_batch ->
stats = updated_batch.batch_stats
progress = EmbedContentBatchStats.progress_percentage(stats)
IO.puts("Progress: #{Float.round(progress, 1)}%")
end
)Option B: Manual Status Check
{:ok, status} = Gemini.get_batch_status(batch_id)
case status.state do
:completed ->
# Batch is done, retrieve embeddings
{:ok, embeddings} = Gemini.get_batch_embeddings(status)
:processing ->
# Still working, check again later
if status.batch_stats do
progress = EmbedContentBatchStats.progress_percentage(status.batch_stats)
IO.puts("Still processing: #{progress}%")
end
:failed ->
# Batch failed, check stats for details
IO.puts("Batch failed")
:pending ->
# Batch queued, not yet started
IO.puts("Waiting to start...")
endStep 3: Retrieve Embeddings
{:ok, completed_batch} = Gemini.get_batch_status(batch_id)
case completed_batch.state do
:completed ->
{:ok, embeddings} = Gemini.get_batch_embeddings(completed_batch)
# IMPORTANT: Normalize if not using 3072 dimensions
normalized_embeddings = Enum.map(embeddings, &ContentEmbedding.normalize/1)
# Now safe to use for similarity calculations
similarity = ContentEmbedding.cosine_similarity(
Enum.at(normalized_embeddings, 0),
Enum.at(normalized_embeddings, 1)
)
_ ->
IO.puts("Batch not yet completed")
endProduction Patterns
Pattern 1: Non-blocking Submission
Best for: Web applications, user-facing workflows
defmodule MyApp.EmbeddingService do
def index_documents_async(documents, user_id) do
# 1. Submit batch
{:ok, batch} = Gemini.async_batch_embed_contents(
documents,
display_name: "User #{user_id} - #{DateTime.utc_now()}"
)
# 2. Store batch ID in database
{:ok, job} = MyApp.Repo.insert(%EmbeddingJob{
batch_id: batch.name,
user_id: user_id,
status: "pending",
document_count: length(documents)
})
# 3. Return immediately
{:ok, job}
end
endPattern 2: Background Worker
Best for: Scheduled jobs, cron tasks
defmodule MyApp.EmbeddingWorker do
use Oban.Worker, queue: :embeddings
@impl Oban.Worker
def perform(%Oban.Job{args: %{"batch_id" => batch_id}}) do
case Gemini.get_batch_status(batch_id) do
{:ok, %{state: :completed} = batch} ->
# Process completed batch
{:ok, embeddings} = Gemini.get_batch_embeddings(batch)
store_embeddings(embeddings)
:ok
{:ok, %{state: state}} when state in [:pending, :processing] ->
# Reschedule to check later
{:snooze, 60} # Check again in 60 seconds
{:ok, %{state: :failed}} ->
# Handle failure
notify_failure(batch_id)
{:error, :batch_failed}
{:error, reason} ->
{:error, reason}
end
end
endPattern 3: Real-time Progress Dashboard
Best for: Admin interfaces, monitoring
defmodule MyAppWeb.BatchLive do
use Phoenix.LiveView
def mount(%{"batch_id" => batch_id}, _session, socket) do
# Poll every 5 seconds
if connected?(socket), do: :timer.send_interval(5000, self(), :update)
{:ok, assign(socket, batch_id: batch_id, batch: nil)}
end
def handle_info(:update, socket) do
case Gemini.get_batch_status(socket.assigns.batch_id) do
{:ok, batch} ->
{:noreply, assign(socket, batch: batch)}
{:error, _} ->
{:noreply, socket}
end
end
def render(assigns) do
~H"""
<div>
<h2>Batch Status: <%= @batch.state %></h2>
<%= if @batch.batch_stats do %>
<div>Progress: <%= progress_percentage(@batch.batch_stats) %>%</div>
<div>Success: <%= @batch.batch_stats.successful_request_count %></div>
<div>Failed: <%= @batch.batch_stats.failed_request_count %></div>
<% end %>
</div>
"""
end
endAPI Reference
async_batch_embed_contents/2
Submit an async batch embedding job.
@spec async_batch_embed_contents([String.t()], keyword()) ::
{:ok, EmbedContentBatch.t()} | {:error, term()}Parameters:
texts: List of strings to embedopts: Keyword list of options
Options:
:display_name(required) - Human-readable batch name:model- Model to use (default: "gemini-embedding-001"):task_type- Optimization hint (:retrieval_document,:retrieval_query, etc.):output_dimensionality- Output dimensions (128-3072):priority- Processing priority (default: 0, higher = more urgent):auth- Auth strategy (:geminior:vertex_ai)
Returns:
{:ok, batch}withbatch.namefor polling{:error, reason}if submission fails
Example:
{:ok, batch} = Gemini.async_batch_embed_contents(
["text1", "text2"],
display_name: "My Batch",
task_type: :retrieval_document,
output_dimensionality: 768,
priority: 10
)get_batch_status/2
Check the status of a batch job.
@spec get_batch_status(String.t(), keyword()) ::
{:ok, EmbedContentBatch.t()} | {:error, term()}Parameters:
batch_id: Batch identifier (format: "batches/{batchId}")opts: Options (primarily:auth)
Returns:
{:ok, batch}with current state and stats{:error, reason}if status check fails
Example:
{:ok, batch} = Gemini.get_batch_status("batches/abc123")
IO.puts("State: #{batch.state}")
IO.puts("Progress: #{EmbedContentBatchStats.progress_percentage(batch.batch_stats)}%")get_batch_embeddings/1
Retrieve embeddings from a completed batch.
@spec get_batch_embeddings(EmbedContentBatch.t()) ::
{:ok, [ContentEmbedding.t()]} | {:error, term()}Parameters:
batch: Completed EmbedContentBatch struct
Returns:
{:ok, embeddings}- List of ContentEmbedding structs{:error, reason}if batch not complete or file-based
Example:
{:ok, batch} = Gemini.get_batch_status(batch_id)
if batch.state == :completed do
{:ok, embeddings} = Gemini.get_batch_embeddings(batch)
IO.puts("Retrieved #{length(embeddings)} embeddings")
endawait_batch_completion/2
Convenience function to poll until completion.
@spec await_batch_completion(String.t(), keyword()) ::
{:ok, EmbedContentBatch.t()} | {:error, term()}Parameters:
batch_id: Batch identifieropts: Polling options
Options:
:poll_interval- Milliseconds between polls (default: 5000):timeout- Max wait time in milliseconds (default: 600000 = 10min):on_progress- Callback function called on each poll:auth- Auth strategy
Returns:
{:ok, batch}when complete{:error, :timeout}if timeout exceeded{:error, reason}for other errors
Example:
{:ok, batch} = Gemini.await_batch_completion(
batch_id,
poll_interval: 10_000,
timeout: 30 * 60 * 1000, # 30 minutes
on_progress: fn b ->
progress = EmbedContentBatchStats.progress_percentage(b.batch_stats)
IO.puts("Progress: #{progress}%")
end
)Error Handling
Common Errors
1. Argument Error
{:error, %ArgumentError{message: "display_name is required..."}}Solution: Always provide display_name option:
Gemini.async_batch_embed_contents(texts, display_name: "My Batch")2. Batch Not Complete
{:error, "Batch not yet completed (current state: processing)"}Solution: Check state before retrieving embeddings:
case batch.state do
:completed -> Gemini.get_batch_embeddings(batch)
_ -> {:error, :not_ready}
end3. Timeout
{:error, :timeout}Solution: Increase timeout or poll asynchronously:
await_batch_completion(batch_id, timeout: 30 * 60 * 1000)4. Failed Requests in Batch
Some requests may fail while others succeed. Check stats:
if batch.batch_stats.failed_request_count > 0 do
# Get failed request details
failed = InlinedEmbedContentResponses.failed_responses(batch.output.inlined_responses)
# Retry failed requests
retry_texts = Enum.map(failed, fn {idx, _error} -> Enum.at(original_texts, idx) end)
{:ok, retry_batch} = Gemini.async_batch_embed_contents(retry_texts, ...)
endRetry Strategy
defmodule MyApp.EmbeddingRetry do
def submit_with_retry(texts, opts, max_retries \\ 3) do
case Gemini.async_batch_embed_contents(texts, opts) do
{:ok, batch} -> {:ok, batch}
{:error, reason} when max_retries > 0 ->
# Exponential backoff
:timer.sleep(1000 * (4 - max_retries))
submit_with_retry(texts, opts, max_retries - 1)
{:error, reason} ->
{:error, reason}
end
end
endPerformance Tuning
Optimal Batch Sizes
| Batch Size | Recommended Poll Interval | Typical Completion Time |
|---|---|---|
| 10-100 | 2-5 seconds | 30s - 2min |
| 100-1,000 | 5-10 seconds | 2-10min |
| 1,000-10,000 | 10-30 seconds | 10-30min |
| 10,000+ | 30-60 seconds | 30min - 2hr |
Dimension Selection
Trade-off between storage and quality:
| Dimensions | Storage | MTEB Score | Use Case |
|---|---|---|---|
| 128 | 12.5% | 67.04 | Extreme efficiency |
| 256 | 25% | 67.75 | High efficiency |
| 768 | 75% | 67.99 | Recommended |
| 1536 | 50% | 68.17 | High quality |
| 3072 | 100% | 68.17 | Maximum quality |
Recommendation: Use 768d for best balance (75% storage savings, <0.3% quality loss).
Polling Strategy
# Calculate adaptive poll interval based on batch size
def calculate_poll_interval(batch_size) do
cond do
batch_size < 100 -> 2_000 # 2 seconds
batch_size < 1000 -> 5_000 # 5 seconds
batch_size < 10_000 -> 10_000 # 10 seconds
true -> 30_000 # 30 seconds
end
end
# Calculate timeout based on batch size
def calculate_timeout(batch_size) do
# Estimate: ~1 second per document + 2 minute buffer
(batch_size * 1000) + (2 * 60 * 1000)
endBest Practices
1. Always Normalize Non-3072d Embeddings
# ❌ WRONG - Similarity will be incorrect
similarity = ContentEmbedding.cosine_similarity(embedding1, embedding2)
# ✅ CORRECT - Normalize first
normalized1 = ContentEmbedding.normalize(embedding1)
normalized2 = ContentEmbedding.normalize(embedding2)
similarity = ContentEmbedding.cosine_similarity(normalized1, normalized2)2. Use Descriptive Batch Names
# ❌ WRONG - Hard to track
display_name: "Batch 1"
# ✅ CORRECT - Descriptive and timestamped
display_name: "Product Catalog Index - #{DateTime.utc_now() |> DateTime.to_unix()}"3. Store Batch IDs in Database
# Create tracking record
{:ok, batch} = Gemini.async_batch_embed_contents(texts, display_name: name)
{:ok, _job} = Repo.insert(%EmbeddingJob{
batch_id: batch.name,
status: to_string(batch.state),
created_at: DateTime.utc_now()
})4. Monitor Batch Statistics
def monitor_batch(batch_id) do
{:ok, batch} = Gemini.get_batch_status(batch_id)
stats = batch.batch_stats
success_rate = EmbedContentBatchStats.success_rate(stats)
# Alert if success rate drops below threshold
if success_rate < 95.0 do
notify_ops_team("Batch #{batch_id} has #{success_rate}% success rate")
end
end5. Implement Exponential Backoff
def poll_with_backoff(batch_id, attempt \\ 1, max_attempts \\ 10) do
case Gemini.get_batch_status(batch_id) do
{:ok, %{state: :completed} = batch} ->
{:ok, batch}
{:ok, batch} when attempt < max_attempts ->
# Exponential backoff: 2^attempt * 1000ms
:timer.sleep(:math.pow(2, attempt) * 1000)
poll_with_backoff(batch_id, attempt + 1, max_attempts)
{:error, reason} ->
{:error, reason}
end
end6. Use Task Types for Better Quality
# For indexing documents
Gemini.async_batch_embed_contents(
documents,
task_type: :retrieval_document,
display_name: "Document Index"
)
# For embedding queries
Gemini.embed_content(
query,
task_type: :retrieval_query
)7. Batch Size Optimization
# Split large datasets into manageable batches
def process_large_dataset(texts, batch_size \\ 10_000) do
texts
|> Enum.chunk_every(batch_size)
|> Enum.map(fn chunk ->
{:ok, batch} = Gemini.async_batch_embed_contents(
chunk,
display_name: "Chunk #{System.unique_integer([:positive])}"
)
batch.name
end)
endSummary
The Async Batch Embedding API is your go-to solution for production-scale embedding generation:
- 50% cost savings for large-scale indexing
- Non-blocking workflow for better user experience
- Progress tracking for monitoring and alerting
- Production-ready with comprehensive error handling
Start with the demos, adapt the patterns to your workflow, and scale to millions of embeddings efficiently!
Related Resources
- Live Demos:
examples/async_batch_embedding_demo.exs - Production Patterns:
examples/async_batch_production_demo.exs - API Specification:
oldDocs/docs/spec/GEMINI-API-07-EMBEDDINGS_20251014.md - Sync Embeddings Guide:
examples/EMBEDDINGS.md