Gemini.APIs.Coordinator (GeminiEx v0.8.4)

View Source

Coordinates API calls across different authentication strategies and endpoints.

Provides a unified interface that can route requests to either Gemini API or Vertex AI based on configuration, while maintaining the same interface.

This module acts as the main entry point for all Gemini API operations, automatically handling authentication strategy selection and request routing.

Features

  • Unified API for content generation across auth strategies
  • Automatic auth strategy selection based on configuration
  • Per-request auth strategy override capability
  • Consistent error handling and response format
  • Support for both streaming and non-streaming operations
  • Model listing and token counting functionality

Usage

# Use default auth strategy
{:ok, response} = Coordinator.generate_content("Hello world")

# Override auth strategy for specific request
{:ok, response} = Coordinator.generate_content("Hello world", auth: :vertex_ai)

# Start streaming with specific auth
{:ok, stream_id} = Coordinator.stream_generate_content("Tell me a story", auth: :gemini)

See Gemini.options/0 in Gemini for the canonical list of options.

Summary

Functions

Submit an asynchronous batch embedding job for production-scale embedding generation.

Poll and wait for batch completion with configurable intervals.

Generate embeddings for multiple text inputs in a single batch request.

Count tokens in the given input.

Generate an embedding for the given text content.

Extract function calls from a GenerateContentResponse.

Extract text content from a GenerateContentResponse.

Generate content using the specified model and input.

Retrieve embeddings from a completed batch job.

Get the current status of an async batch embedding job.

Get information about a specific model.

Check if a response contains function calls.

List available models for the specified authentication strategy.

Stop a streaming content generation.

Stream content generation with real-time response chunks.

Get the status of a streaming content generation.

Subscribe to a streaming content generation.

Unsubscribe from a streaming content generation.

Types

api_result(t)

@type api_result(t) :: {:ok, t} | {:error, term()}

auth_strategy()

@type auth_strategy() :: :gemini | :vertex_ai

request_opts()

@type request_opts() :: keyword()

Functions

async_batch_embed_contents(texts_or_requests, opts \\ [])

Submit an asynchronous batch embedding job for production-scale embedding generation.

Processes large batches of embeddings at 50% cost compared to interactive API. Returns immediately with a batch ID for polling. Suitable for embedding thousands to millions of texts for RAG systems, knowledge bases, and large-scale retrieval.

See Gemini.options/0 for available options.

Parameters

  • texts_or_requests: List of strings OR list of EmbedContentRequest structs
  • opts: Options including model, display_name, priority, task_type, etc.

Options

  • :model: Model to use (default: auto-detected based on auth)
  • :display_name: Human-readable batch name (required)
  • :priority: Processing priority (default: 0, higher = more urgent)
  • :task_type: Task type applied to all requests
  • :output_dimensionality: Dimension for all embeddings
  • :auth: Authentication strategy

Returns

  • {:ok, batch} - EmbedContentBatch with :name for polling
  • {:error, reason} - Failed to submit batch

Examples

# Simple batch
{:ok, batch} = Coordinator.async_batch_embed_contents(
  ["Text 1", "Text 2", "Text 3"],
  display_name: "My Knowledge Base",
  task_type: :retrieval_document
)

# Poll for completion
{:ok, updated_batch} = Coordinator.get_batch_status(batch.name)

case updated_batch.state do
  :completed ->
    {:ok, embeddings} = Coordinator.get_batch_embeddings(updated_batch)
    IO.puts("Retrieved #{length(embeddings)} embeddings")
  :processing ->
    progress = updated_batch.batch_stats.successful_request_count
    IO.puts("Progress: #{progress} completed")
  :failed ->
    IO.puts("Batch failed")
end

await_batch_completion(batch_name, opts \\ [])

@spec await_batch_completion(
  String.t(),
  keyword()
) :: api_result(Gemini.Types.Response.EmbedContentBatch.t())

Poll and wait for batch completion with configurable intervals.

Convenience function that polls get_batch_status until completion or timeout. Useful for synchronous workflows or testing.

See Gemini.options/0 for available options.

Options

  • :poll_interval: Milliseconds between polls (default: 5000)
  • :timeout: Max wait time in milliseconds (default: 600000 = 10 min)
  • :on_progress: Callback function called on each poll with batch

Returns

  • {:ok, batch} - Completed batch (succeeded or failed)
  • {:error, :timeout} - Timed out waiting for completion
  • {:error, reason} - Failed to poll status

Examples

{:ok, batch} = Coordinator.async_batch_embed_contents(texts,
  display_name: "Batch 1"
)

{:ok, completed_batch} = Coordinator.await_batch_completion(
  batch.name,
  poll_interval: 10_000,  # 10 seconds
  timeout: 1_800_000,     # 30 minutes
  on_progress: fn b ->
    if b.batch_stats do
      progress = (b.batch_stats.successful_request_count || 0) / b.batch_stats.request_count * 100
      IO.puts("Progress: #{Float.round(progress, 1)}%")
    end
  end
)

batch_embed_contents(texts, opts \\ [])

Generate embeddings for multiple text inputs in a single batch request.

More efficient than individual requests when embedding multiple texts.

See Gemini.options/0 for available options.

Parameters

  • texts: List of text strings to embed
  • opts: Options including model, auth strategy, and embedding-specific parameters

Options

Same as embed_content/2, applied to all texts in the batch.

Examples

# Batch embedding
{:ok, response} = Coordinator.batch_embed_contents([
  "What is AI?",
  "How does machine learning work?",
  "Explain neural networks"
])

{:ok, all_values} = BatchEmbedContentsResponse.get_all_values(response)

# With task type
{:ok, response} = Coordinator.batch_embed_contents(
  ["Doc 1 content", "Doc 2 content", "Doc 3 content"],
  task_type: :retrieval_document,
  output_dimensionality: 256
)

count_tokens(input, opts \\ [])

@spec count_tokens(
  String.t() | Gemini.Types.Request.GenerateContentRequest.t(),
  Gemini.options()
) ::
  api_result(%{total_tokens: integer()})

Count tokens in the given input.

See Gemini.options/0 for available options.

Parameters

  • input: String or GenerateContentRequest to count tokens for
  • opts: Options including model and auth strategy

Options

  • :model: Model to use for token counting (defaults to configured default model)
  • :auth: Authentication strategy (:gemini or :vertex_ai)

Examples

{:ok, count} = Coordinator.count_tokens("Hello world")
{:ok, count} = Coordinator.count_tokens("Complex text", model: "gemini-2.5-pro", auth: :vertex_ai)

embed_content(text, opts \\ [])

Generate an embedding for the given text content.

Uses the appropriate embedding model based on detected authentication:

  • Gemini API: gemini-embedding-001 (3072 dimensions, task type parameter)
  • Vertex AI: embeddinggemma (768 dimensions, prompt prefix formatting)

See Gemini.options/0 for available options.

Parameters

  • text: String content to embed
  • opts: Options including model, auth strategy, and embedding-specific parameters

Options

  • :model: Embedding model to use (default: auto-detected based on auth)
  • :auth: Authentication strategy (:gemini or :vertex_ai)
  • :task_type: Optional task type for optimized embeddings
    • :retrieval_query - Text is a search query
    • :retrieval_document - Text is a document being searched
    • :semantic_similarity - For semantic similarity tasks
    • :classification - For classification tasks
    • :clustering - For clustering tasks
    • :question_answering - For Q&A tasks
    • :fact_verification - For fact verification
    • :code_retrieval_query - For code retrieval
  • :title: Optional title (only for :retrieval_document task type)
  • :output_dimensionality: Optional dimension reduction

API-Specific Behavior

For Gemini API (gemini-embedding-001):

  • Task type is passed as taskType parameter
  • Default dimensions: 3072 (supports MRL: 768, 1536, 3072)
  • Dimensions below 3072 need manual normalization

For Vertex AI (embeddinggemma):

  • Task type is embedded as prompt prefix in the text
  • Default dimensions: 768 (supports MRL: 128, 256, 512, 768)
  • All dimensions are pre-normalized

Examples

# Simple embedding (auto-detects model)
{:ok, response} = Coordinator.embed_content("What is the meaning of life?")
{:ok, values} = EmbedContentResponse.get_values(response)

# With task type (works with both APIs transparently)
{:ok, response} = Coordinator.embed_content(
  "This is a document about AI",
  task_type: :retrieval_document,
  title: "AI Overview"
)

# With explicit dimensionality
{:ok, response} = Coordinator.embed_content(
  "Query text",
  task_type: :retrieval_query,
  output_dimensionality: 768
)

extract_function_calls(response)

@spec extract_function_calls(
  Gemini.Types.Response.GenerateContentResponse.t()
  | map()
) :: [
  Altar.ADM.FunctionCall.t()
]

Extract function calls from a GenerateContentResponse.

Returns a list of Altar.ADM.FunctionCall structs if the response contains function calls, or an empty list if none are found.

Examples

{:ok, response} = Coordinator.generate_content("What's the weather?", tools: tools)

case Coordinator.extract_function_calls(response) do
  [] ->
    # No function calls, extract text normally
    {:ok, text} = Coordinator.extract_text(response)

  calls ->
    # Execute function calls and continue conversation
    results = Executor.execute_all(calls, registry)
end

extract_text(arg1)

@spec extract_text(Gemini.Types.Response.GenerateContentResponse.t()) ::
  {:ok, String.t()} | {:error, term()}

Extract text content from a GenerateContentResponse.

Examples

{:ok, response} = Coordinator.generate_content("Hello")
{:ok, text} = Coordinator.extract_text(response)

generate_content(input, opts \\ [])

Generate content using the specified model and input.

See Gemini.options/0 for available options.

Parameters

  • input: String prompt or GenerateContentRequest struct
  • opts: Options including model, auth strategy, and generation config

Examples

# Simple text generation
{:ok, response} = Coordinator.generate_content("What is AI?")

# With specific model and auth
{:ok, response} = Coordinator.generate_content(
  "Explain quantum computing",
  model: Gemini.Config.get_model(:flash_lite_latest),
  auth: :vertex_ai,
  temperature: 0.7
)

# Using request struct
request = %GenerateContentRequest{...}
{:ok, response} = Coordinator.generate_content(request)

get_batch_embeddings(batch)

Retrieve embeddings from a completed batch job.

Only works for batches in :completed state with inline responses. For file-based outputs, use file download APIs.

Parameters

  • batch: Completed EmbedContentBatch

Returns

  • {:ok, embeddings} - List of ContentEmbedding results
  • {:error, reason} - Batch not complete or file-based

Examples

{:ok, batch} = Coordinator.get_batch_status("batches/abc123")

if batch.state == :completed do
  {:ok, embeddings} = Coordinator.get_batch_embeddings(batch)
  IO.puts("Retrieved #{length(embeddings)} embeddings")
end

get_batch_status(batch_name, opts \\ [])

Get the current status of an async batch embedding job.

Polls the batch status to check progress, completion, or failures.

See Gemini.options/0 for available options.

Parameters

  • batch_name: Batch identifier (format: "batches/{batchId}")
  • opts: Optional auth and other options

Returns

  • {:ok, batch} - Current batch status with stats
  • {:error, reason} - Failed to retrieve status

Examples

{:ok, batch} = Coordinator.get_batch_status("batches/abc123")

IO.puts("State: #{batch.state}")

if batch.batch_stats do
  completed = batch.batch_stats.successful_request_count + batch.batch_stats.failed_request_count
  total = batch.batch_stats.request_count
  IO.puts("Progress: #{completed}/#{total}")
end

get_model(model_name, opts \\ [])

@spec get_model(String.t(), Gemini.options()) :: api_result(map())

Get information about a specific model.

See Gemini.options/0 for available options.

Parameters

  • model_name: Name of the model to retrieve
  • opts: Options including auth strategy

Examples

{:ok, model} = Coordinator.get_model(Gemini.Config.get_model(:flash_lite_latest))
{:ok, model} = Coordinator.get_model("gemini-2.5-pro", auth: :vertex_ai)

has_function_calls?(response)

@spec has_function_calls?(Gemini.Types.Response.GenerateContentResponse.t() | map()) ::
  boolean()

Check if a response contains function calls.

Examples

{:ok, response} = Coordinator.generate_content("Calculate 2+2", tools: tools)

if Coordinator.has_function_calls?(response) do
  calls = Coordinator.extract_function_calls(response)
  # Handle function calls
else
  {:ok, text} = Coordinator.extract_text(response)
end

list_models(opts \\ [])

List available models for the specified authentication strategy.

See Gemini.options/0 for available options.

Parameters

  • opts: Options including auth strategy and pagination

Options

  • :auth: Authentication strategy (:gemini or :vertex_ai)
  • :page_size: Number of models per page
  • :page_token: Pagination token for next page

Examples

# List models with default auth
{:ok, models_response} = Coordinator.list_models()

# List models with specific auth strategy
{:ok, models_response} = Coordinator.list_models(auth: :vertex_ai)

# With pagination
{:ok, models_response} = Coordinator.list_models(
  auth: :gemini,
  page_size: 50,
  page_token: "next_page_token"
)

stop_stream(stream_id)

@spec stop_stream(String.t()) :: :ok | {:error, term()}

Stop a streaming content generation.

stream_generate_content(input, opts \\ [])

Stream content generation with real-time response chunks.

See Gemini.options/0 for available options.

Parameters

  • input: String prompt or GenerateContentRequest struct
  • opts: Options including model, auth strategy, and generation config

Returns

  • {:ok, stream_id}: Stream started successfully
  • {:error, reason}: Failed to start stream

After starting the stream, subscribe to receive events:

{:ok, stream_id} = Coordinator.stream_generate_content("Tell me a story")
:ok = Coordinator.subscribe_stream(stream_id)

# Handle incoming messages
receive do
  {:stream_event, ^stream_id, event} ->
    IO.inspect(event, label: "Stream Event")
  {:stream_complete, ^stream_id} ->
    IO.puts("Stream completed")
  {:stream_error, ^stream_id, stream_error} ->
    IO.puts("Stream error: #{inspect(stream_error)}")
end

Examples

# Basic streaming
{:ok, stream_id} = Coordinator.stream_generate_content("Write a poem")

# With specific configuration
{:ok, stream_id} = Coordinator.stream_generate_content(
  "Explain machine learning",
  model: Gemini.Config.get_model(:flash_lite_latest),
  auth: :gemini,
  temperature: 0.8,
  max_output_tokens: 1000
)

stream_status(stream_id)

@spec stream_status(String.t()) :: {:ok, atom()} | {:error, term()}

Get the status of a streaming content generation.

subscribe_stream(stream_id, subscriber_pid \\ self())

@spec subscribe_stream(String.t(), pid()) :: :ok | {:error, term()}

Subscribe to a streaming content generation.

Parameters

  • stream_id: ID of the stream to subscribe to
  • subscriber_pid: Process to receive stream events (defaults to current process)

Examples

{:ok, stream_id} = Coordinator.stream_generate_content("Hello")
:ok = Coordinator.subscribe_stream(stream_id)

# In a different process
:ok = Coordinator.subscribe_stream(stream_id, target_pid)

unsubscribe_stream(stream_id, subscriber_pid \\ self())

@spec unsubscribe_stream(String.t(), pid()) :: :ok | {:error, term()}

Unsubscribe from a streaming content generation.