Async Batch Embeddings - Production Guide

View Source

Complete guide to production-scale embedding generation with 50% cost savings

Table of Contents


Overview

The Async Batch Embedding API allows you to process large-scale embedding jobs asynchronously with 50% cost savings compared to the interactive embedding API. It's designed for production scenarios where you need to embed thousands to millions of texts for RAG systems, knowledge bases, and large-scale retrieval.

Key Features

  • 50% Cost Reduction: Half the cost per embedding vs interactive API
  • Long-Running Operations (LRO): Submit job and retrieve results later
  • Progress Tracking: Real-time statistics on success, failure, and pending requests
  • Priority Support: Control processing order with priority field
  • Multi-auth Compatible: Works with both Gemini API and Vertex AI
  • Type-safe: Complete type annotations and error handling

Architecture

Submit Batch  [PENDING]  [PROCESSING]  [COMPLETED]
                                        [FAILED]
                                        [CANCELLED]

The batch progresses through states, allowing you to track progress and retrieve results when complete.


When to Use

Use Async Batch API For:

Large-scale indexing (1000s-millions of documents) ✅ RAG system setup (building knowledge base indices) ✅ Non-urgent embedding generation (background processing) ✅ Cost-sensitive workflows (50% savings adds up at scale) ✅ Batch data migration (moving to new embedding model)

Use Interactive API For:

Real-time embedding (user-facing features) ❌ Small batches (<100 texts typically faster with interactive) ❌ Time-critical workflows (need immediate results) ❌ Interactive exploration (rapid iteration and testing)


Cost Analysis

Cost Comparison (Relative Units)

DocumentsInteractive APIAsync Batch APISavings
1,0001,000500500
10,00010,0005,0005,000
100,000100,00050,00050,000
1,000,0001,000,000500,000500,000

Break-even Analysis

For typical workflows:

  • Setup time: ~2-5 minutes additional for batch workflow
  • Cost savings: 50% per embedding
  • Break-even: ~100-200 documents (depends on workflow)

Recommendation: Use async batch for any job >500 documents or when time is not critical.


Quick Start

Basic Example

# 1. Submit batch
{:ok, batch} = Gemini.async_batch_embed_contents(
  ["Text 1", "Text 2", "Text 3"],
  display_name: "My Batch",
  task_type: :retrieval_document,
  output_dimensionality: 768
)

# 2. Wait for completion
{:ok, completed_batch} = Gemini.await_batch_completion(batch.name)

# 3. Retrieve embeddings
{:ok, embeddings} = Gemini.get_batch_embeddings(completed_batch)

Run Demo

# Set API key
export GEMINI_API_KEY='your-key-here'

# Run comprehensive demo
mix run examples/async_batch_embedding_demo.exs

# Run production patterns demo
mix run examples/async_batch_production_demo.exs

Complete Workflow

Step 1: Submit Batch Job

{:ok, batch} = Gemini.async_batch_embed_contents(
  texts,
  display_name: "Knowledge Base Index - #{timestamp}",
  task_type: :retrieval_document,
  output_dimensionality: 768,
  priority: 5  # Higher = more urgent
)

# Save batch.name for later retrieval
batch_id = batch.name
# => "batches/abc123def456..."

Key Points:

  • display_name is required - use descriptive names for tracking
  • task_type optimizes embeddings for specific use cases
  • output_dimensionality defaults to model default (typically 3072)
  • priority controls processing order (default: 0)

Step 2: Poll for Status

Option A: Active Polling with Progress

{:ok, completed_batch} = Gemini.await_batch_completion(
  batch_id,
  poll_interval: 10_000,  # Poll every 10 seconds
  timeout: 1_800_000,     # 30 minute timeout
  on_progress: fn updated_batch ->
    stats = updated_batch.batch_stats
    progress = EmbedContentBatchStats.progress_percentage(stats)
    IO.puts("Progress: #{Float.round(progress, 1)}%")
  end
)

Option B: Manual Status Check

{:ok, status} = Gemini.get_batch_status(batch_id)

case status.state do
  :completed ->
    # Batch is done, retrieve embeddings
    {:ok, embeddings} = Gemini.get_batch_embeddings(status)

  :processing ->
    # Still working, check again later
    if status.batch_stats do
      progress = EmbedContentBatchStats.progress_percentage(status.batch_stats)
      IO.puts("Still processing: #{progress}%")
    end

  :failed ->
    # Batch failed, check stats for details
    IO.puts("Batch failed")

  :pending ->
    # Batch queued, not yet started
    IO.puts("Waiting to start...")
end

Step 3: Retrieve Embeddings

{:ok, completed_batch} = Gemini.get_batch_status(batch_id)

case completed_batch.state do
  :completed ->
    {:ok, embeddings} = Gemini.get_batch_embeddings(completed_batch)

    # IMPORTANT: Normalize if not using 3072 dimensions
    normalized_embeddings = Enum.map(embeddings, &ContentEmbedding.normalize/1)

    # Now safe to use for similarity calculations
    similarity = ContentEmbedding.cosine_similarity(
      Enum.at(normalized_embeddings, 0),
      Enum.at(normalized_embeddings, 1)
    )

  _ ->
    IO.puts("Batch not yet completed")
end

Production Patterns

Pattern 1: Non-blocking Submission

Best for: Web applications, user-facing workflows

defmodule MyApp.EmbeddingService do
  def index_documents_async(documents, user_id) do
    # 1. Submit batch
    {:ok, batch} = Gemini.async_batch_embed_contents(
      documents,
      display_name: "User #{user_id} - #{DateTime.utc_now()}"
    )

    # 2. Store batch ID in database
    {:ok, job} = MyApp.Repo.insert(%EmbeddingJob{
      batch_id: batch.name,
      user_id: user_id,
      status: "pending",
      document_count: length(documents)
    })

    # 3. Return immediately
    {:ok, job}
  end
end

Pattern 2: Background Worker

Best for: Scheduled jobs, cron tasks

defmodule MyApp.EmbeddingWorker do
  use Oban.Worker, queue: :embeddings

  @impl Oban.Worker
  def perform(%Oban.Job{args: %{"batch_id" => batch_id}}) do
    case Gemini.get_batch_status(batch_id) do
      {:ok, %{state: :completed} = batch} ->
        # Process completed batch
        {:ok, embeddings} = Gemini.get_batch_embeddings(batch)
        store_embeddings(embeddings)
        :ok

      {:ok, %{state: state}} when state in [:pending, :processing] ->
        # Reschedule to check later
        {:snooze, 60}  # Check again in 60 seconds

      {:ok, %{state: :failed}} ->
        # Handle failure
        notify_failure(batch_id)
        {:error, :batch_failed}

      {:error, reason} ->
        {:error, reason}
    end
  end
end

Pattern 3: Real-time Progress Dashboard

Best for: Admin interfaces, monitoring

defmodule MyAppWeb.BatchLive do
  use Phoenix.LiveView

  def mount(%{"batch_id" => batch_id}, _session, socket) do
    # Poll every 5 seconds
    if connected?(socket), do: :timer.send_interval(5000, self(), :update)

    {:ok, assign(socket, batch_id: batch_id, batch: nil)}
  end

  def handle_info(:update, socket) do
    case Gemini.get_batch_status(socket.assigns.batch_id) do
      {:ok, batch} ->
        {:noreply, assign(socket, batch: batch)}
      {:error, _} ->
        {:noreply, socket}
    end
  end

  def render(assigns) do
    ~H"""
    <div>
      <h2>Batch Status: <%= @batch.state %></h2>
      <%= if @batch.batch_stats do %>
        <div>Progress: <%= progress_percentage(@batch.batch_stats) %>%</div>
        <div>Success: <%= @batch.batch_stats.successful_request_count %></div>
        <div>Failed: <%= @batch.batch_stats.failed_request_count %></div>
      <% end %>
    </div>
    """
  end
end

API Reference

async_batch_embed_contents/2

Submit an async batch embedding job.

@spec async_batch_embed_contents([String.t()], keyword()) ::
  {:ok, EmbedContentBatch.t()} | {:error, term()}

Parameters:

  • texts: List of strings to embed
  • opts: Keyword list of options

Options:

  • :display_name (required) - Human-readable batch name
  • :model - Model to use (default: "gemini-embedding-001")
  • :task_type - Optimization hint (:retrieval_document, :retrieval_query, etc.)
  • :output_dimensionality - Output dimensions (128-3072)
  • :priority - Processing priority (default: 0, higher = more urgent)
  • :auth - Auth strategy (:gemini or :vertex_ai)

Returns:

  • {:ok, batch} with batch.name for polling
  • {:error, reason} if submission fails

Example:

{:ok, batch} = Gemini.async_batch_embed_contents(
  ["text1", "text2"],
  display_name: "My Batch",
  task_type: :retrieval_document,
  output_dimensionality: 768,
  priority: 10
)

get_batch_status/2

Check the status of a batch job.

@spec get_batch_status(String.t(), keyword()) ::
  {:ok, EmbedContentBatch.t()} | {:error, term()}

Parameters:

  • batch_id: Batch identifier (format: "batches/{batchId}")
  • opts: Options (primarily :auth)

Returns:

  • {:ok, batch} with current state and stats
  • {:error, reason} if status check fails

Example:

{:ok, batch} = Gemini.get_batch_status("batches/abc123")

IO.puts("State: #{batch.state}")
IO.puts("Progress: #{EmbedContentBatchStats.progress_percentage(batch.batch_stats)}%")

get_batch_embeddings/1

Retrieve embeddings from a completed batch.

@spec get_batch_embeddings(EmbedContentBatch.t()) ::
  {:ok, [ContentEmbedding.t()]} | {:error, term()}

Parameters:

  • batch: Completed EmbedContentBatch struct

Returns:

  • {:ok, embeddings} - List of ContentEmbedding structs
  • {:error, reason} if batch not complete or file-based

Example:

{:ok, batch} = Gemini.get_batch_status(batch_id)

if batch.state == :completed do
  {:ok, embeddings} = Gemini.get_batch_embeddings(batch)
  IO.puts("Retrieved #{length(embeddings)} embeddings")
end

await_batch_completion/2

Convenience function to poll until completion.

@spec await_batch_completion(String.t(), keyword()) ::
  {:ok, EmbedContentBatch.t()} | {:error, term()}

Parameters:

  • batch_id: Batch identifier
  • opts: Polling options

Options:

  • :poll_interval - Milliseconds between polls (default: 5000)
  • :timeout - Max wait time in milliseconds (default: 600000 = 10min)
  • :on_progress - Callback function called on each poll
  • :auth - Auth strategy

Returns:

  • {:ok, batch} when complete
  • {:error, :timeout} if timeout exceeded
  • {:error, reason} for other errors

Example:

{:ok, batch} = Gemini.await_batch_completion(
  batch_id,
  poll_interval: 10_000,
  timeout: 30 * 60 * 1000,  # 30 minutes
  on_progress: fn b ->
    progress = EmbedContentBatchStats.progress_percentage(b.batch_stats)
    IO.puts("Progress: #{progress}%")
  end
)

Error Handling

Common Errors

1. Argument Error

{:error, %ArgumentError{message: "display_name is required..."}}

Solution: Always provide display_name option:

Gemini.async_batch_embed_contents(texts, display_name: "My Batch")

2. Batch Not Complete

{:error, "Batch not yet completed (current state: processing)"}

Solution: Check state before retrieving embeddings:

case batch.state do
  :completed -> Gemini.get_batch_embeddings(batch)
  _ -> {:error, :not_ready}
end

3. Timeout

{:error, :timeout}

Solution: Increase timeout or poll asynchronously:

await_batch_completion(batch_id, timeout: 30 * 60 * 1000)

4. Failed Requests in Batch

Some requests may fail while others succeed. Check stats:

if batch.batch_stats.failed_request_count > 0 do
  # Get failed request details
  failed = InlinedEmbedContentResponses.failed_responses(batch.output.inlined_responses)

  # Retry failed requests
  retry_texts = Enum.map(failed, fn {idx, _error} -> Enum.at(original_texts, idx) end)
  {:ok, retry_batch} = Gemini.async_batch_embed_contents(retry_texts, ...)
end

Retry Strategy

defmodule MyApp.EmbeddingRetry do
  def submit_with_retry(texts, opts, max_retries \\ 3) do
    case Gemini.async_batch_embed_contents(texts, opts) do
      {:ok, batch} -> {:ok, batch}
      {:error, reason} when max_retries > 0 ->
        # Exponential backoff
        :timer.sleep(1000 * (4 - max_retries))
        submit_with_retry(texts, opts, max_retries - 1)
      {:error, reason} ->
        {:error, reason}
    end
  end
end

Performance Tuning

Optimal Batch Sizes

Batch SizeRecommended Poll IntervalTypical Completion Time
10-1002-5 seconds30s - 2min
100-1,0005-10 seconds2-10min
1,000-10,00010-30 seconds10-30min
10,000+30-60 seconds30min - 2hr

Dimension Selection

Trade-off between storage and quality:

DimensionsStorageMTEB ScoreUse Case
12812.5%67.04Extreme efficiency
25625%67.75High efficiency
76875%67.99Recommended
153650%68.17High quality
3072100%68.17Maximum quality

Recommendation: Use 768d for best balance (75% storage savings, <0.3% quality loss).

Polling Strategy

# Calculate adaptive poll interval based on batch size
def calculate_poll_interval(batch_size) do
  cond do
    batch_size < 100 -> 2_000      # 2 seconds
    batch_size < 1000 -> 5_000     # 5 seconds
    batch_size < 10_000 -> 10_000  # 10 seconds
    true -> 30_000                  # 30 seconds
  end
end

# Calculate timeout based on batch size
def calculate_timeout(batch_size) do
  # Estimate: ~1 second per document + 2 minute buffer
  (batch_size * 1000) + (2 * 60 * 1000)
end

Best Practices

1. Always Normalize Non-3072d Embeddings

# ❌ WRONG - Similarity will be incorrect
similarity = ContentEmbedding.cosine_similarity(embedding1, embedding2)

# ✅ CORRECT - Normalize first
normalized1 = ContentEmbedding.normalize(embedding1)
normalized2 = ContentEmbedding.normalize(embedding2)
similarity = ContentEmbedding.cosine_similarity(normalized1, normalized2)

2. Use Descriptive Batch Names

# ❌ WRONG - Hard to track
display_name: "Batch 1"

# ✅ CORRECT - Descriptive and timestamped
display_name: "Product Catalog Index - #{DateTime.utc_now() |> DateTime.to_unix()}"

3. Store Batch IDs in Database

# Create tracking record
{:ok, batch} = Gemini.async_batch_embed_contents(texts, display_name: name)

{:ok, _job} = Repo.insert(%EmbeddingJob{
  batch_id: batch.name,
  status: to_string(batch.state),
  created_at: DateTime.utc_now()
})

4. Monitor Batch Statistics

def monitor_batch(batch_id) do
  {:ok, batch} = Gemini.get_batch_status(batch_id)

  stats = batch.batch_stats
  success_rate = EmbedContentBatchStats.success_rate(stats)

  # Alert if success rate drops below threshold
  if success_rate < 95.0 do
    notify_ops_team("Batch #{batch_id} has #{success_rate}% success rate")
  end
end

5. Implement Exponential Backoff

def poll_with_backoff(batch_id, attempt \\ 1, max_attempts \\ 10) do
  case Gemini.get_batch_status(batch_id) do
    {:ok, %{state: :completed} = batch} ->
      {:ok, batch}

    {:ok, batch} when attempt < max_attempts ->
      # Exponential backoff: 2^attempt * 1000ms
      :timer.sleep(:math.pow(2, attempt) * 1000)
      poll_with_backoff(batch_id, attempt + 1, max_attempts)

    {:error, reason} ->
      {:error, reason}
  end
end

6. Use Task Types for Better Quality

# For indexing documents
Gemini.async_batch_embed_contents(
  documents,
  task_type: :retrieval_document,
  display_name: "Document Index"
)

# For embedding queries
Gemini.embed_content(
  query,
  task_type: :retrieval_query
)

7. Batch Size Optimization

# Split large datasets into manageable batches
def process_large_dataset(texts, batch_size \\ 10_000) do
  texts
  |> Enum.chunk_every(batch_size)
  |> Enum.map(fn chunk ->
    {:ok, batch} = Gemini.async_batch_embed_contents(
      chunk,
      display_name: "Chunk #{System.unique_integer([:positive])}"
    )
    batch.name
  end)
end

Summary

The Async Batch Embedding API is your go-to solution for production-scale embedding generation:

  • 50% cost savings for large-scale indexing
  • Non-blocking workflow for better user experience
  • Progress tracking for monitoring and alerting
  • Production-ready with comprehensive error handling

Start with the demos, adapt the patterns to your workflow, and scale to millions of embeddings efficiently!

  • Live Demos: examples/async_batch_embedding_demo.exs
  • Production Patterns: examples/async_batch_production_demo.exs
  • API Specification: oldDocs/docs/spec/GEMINI-API-07-EMBEDDINGS_20251014.md
  • Sync Embeddings Guide: examples/EMBEDDINGS.md