Gemini.APIs.Coordinator (GeminiEx v0.8.4)
View SourceCoordinates API calls across different authentication strategies and endpoints.
Provides a unified interface that can route requests to either Gemini API or Vertex AI based on configuration, while maintaining the same interface.
This module acts as the main entry point for all Gemini API operations, automatically handling authentication strategy selection and request routing.
Features
- Unified API for content generation across auth strategies
- Automatic auth strategy selection based on configuration
- Per-request auth strategy override capability
- Consistent error handling and response format
- Support for both streaming and non-streaming operations
- Model listing and token counting functionality
Usage
# Use default auth strategy
{:ok, response} = Coordinator.generate_content("Hello world")
# Override auth strategy for specific request
{:ok, response} = Coordinator.generate_content("Hello world", auth: :vertex_ai)
# Start streaming with specific auth
{:ok, stream_id} = Coordinator.stream_generate_content("Tell me a story", auth: :gemini)See Gemini.options/0 in Gemini for the canonical list of options.
Summary
Functions
Submit an asynchronous batch embedding job for production-scale embedding generation.
Poll and wait for batch completion with configurable intervals.
Generate embeddings for multiple text inputs in a single batch request.
Count tokens in the given input.
Generate an embedding for the given text content.
Extract function calls from a GenerateContentResponse.
Extract text content from a GenerateContentResponse.
Generate content using the specified model and input.
Retrieve embeddings from a completed batch job.
Get the current status of an async batch embedding job.
Get information about a specific model.
Check if a response contains function calls.
List available models for the specified authentication strategy.
Stop a streaming content generation.
Stream content generation with real-time response chunks.
Get the status of a streaming content generation.
Subscribe to a streaming content generation.
Unsubscribe from a streaming content generation.
Types
Functions
@spec async_batch_embed_contents( [String.t()] | [Gemini.Types.Request.EmbedContentRequest.t()], Gemini.options() ) :: api_result(Gemini.Types.Response.EmbedContentBatch.t())
Submit an asynchronous batch embedding job for production-scale embedding generation.
Processes large batches of embeddings at 50% cost compared to interactive API. Returns immediately with a batch ID for polling. Suitable for embedding thousands to millions of texts for RAG systems, knowledge bases, and large-scale retrieval.
See Gemini.options/0 for available options.
Parameters
texts_or_requests: List of strings OR list of EmbedContentRequest structsopts: Options including model, display_name, priority, task_type, etc.
Options
:model: Model to use (default: auto-detected based on auth):display_name: Human-readable batch name (required):priority: Processing priority (default: 0, higher = more urgent):task_type: Task type applied to all requests:output_dimensionality: Dimension for all embeddings:auth: Authentication strategy
Returns
{:ok, batch}- EmbedContentBatch with:namefor polling{:error, reason}- Failed to submit batch
Examples
# Simple batch
{:ok, batch} = Coordinator.async_batch_embed_contents(
["Text 1", "Text 2", "Text 3"],
display_name: "My Knowledge Base",
task_type: :retrieval_document
)
# Poll for completion
{:ok, updated_batch} = Coordinator.get_batch_status(batch.name)
case updated_batch.state do
:completed ->
{:ok, embeddings} = Coordinator.get_batch_embeddings(updated_batch)
IO.puts("Retrieved #{length(embeddings)} embeddings")
:processing ->
progress = updated_batch.batch_stats.successful_request_count
IO.puts("Progress: #{progress} completed")
:failed ->
IO.puts("Batch failed")
end
@spec await_batch_completion( String.t(), keyword() ) :: api_result(Gemini.Types.Response.EmbedContentBatch.t())
Poll and wait for batch completion with configurable intervals.
Convenience function that polls get_batch_status until completion or timeout. Useful for synchronous workflows or testing.
See Gemini.options/0 for available options.
Options
:poll_interval: Milliseconds between polls (default: 5000):timeout: Max wait time in milliseconds (default: 600000 = 10 min):on_progress: Callback function called on each poll with batch
Returns
{:ok, batch}- Completed batch (succeeded or failed){:error, :timeout}- Timed out waiting for completion{:error, reason}- Failed to poll status
Examples
{:ok, batch} = Coordinator.async_batch_embed_contents(texts,
display_name: "Batch 1"
)
{:ok, completed_batch} = Coordinator.await_batch_completion(
batch.name,
poll_interval: 10_000, # 10 seconds
timeout: 1_800_000, # 30 minutes
on_progress: fn b ->
if b.batch_stats do
progress = (b.batch_stats.successful_request_count || 0) / b.batch_stats.request_count * 100
IO.puts("Progress: #{Float.round(progress, 1)}%")
end
end
)
@spec batch_embed_contents([String.t()], Gemini.options()) :: api_result(Gemini.Types.Response.BatchEmbedContentsResponse.t())
Generate embeddings for multiple text inputs in a single batch request.
More efficient than individual requests when embedding multiple texts.
See Gemini.options/0 for available options.
Parameters
texts: List of text strings to embedopts: Options including model, auth strategy, and embedding-specific parameters
Options
Same as embed_content/2, applied to all texts in the batch.
Examples
# Batch embedding
{:ok, response} = Coordinator.batch_embed_contents([
"What is AI?",
"How does machine learning work?",
"Explain neural networks"
])
{:ok, all_values} = BatchEmbedContentsResponse.get_all_values(response)
# With task type
{:ok, response} = Coordinator.batch_embed_contents(
["Doc 1 content", "Doc 2 content", "Doc 3 content"],
task_type: :retrieval_document,
output_dimensionality: 256
)
@spec count_tokens( String.t() | Gemini.Types.Request.GenerateContentRequest.t(), Gemini.options() ) :: api_result(%{total_tokens: integer()})
Count tokens in the given input.
See Gemini.options/0 for available options.
Parameters
input: String or GenerateContentRequest to count tokens foropts: Options including model and auth strategy
Options
:model: Model to use for token counting (defaults to configured default model):auth: Authentication strategy (:geminior:vertex_ai)
Examples
{:ok, count} = Coordinator.count_tokens("Hello world")
{:ok, count} = Coordinator.count_tokens("Complex text", model: "gemini-2.5-pro", auth: :vertex_ai)
@spec embed_content(String.t(), Gemini.options()) :: api_result(Gemini.Types.Response.EmbedContentResponse.t())
Generate an embedding for the given text content.
Uses the appropriate embedding model based on detected authentication:
- Gemini API:
gemini-embedding-001(3072 dimensions, task type parameter) - Vertex AI:
embeddinggemma(768 dimensions, prompt prefix formatting)
See Gemini.options/0 for available options.
Parameters
text: String content to embedopts: Options including model, auth strategy, and embedding-specific parameters
Options
:model: Embedding model to use (default: auto-detected based on auth):auth: Authentication strategy (:geminior:vertex_ai):task_type: Optional task type for optimized embeddings:retrieval_query- Text is a search query:retrieval_document- Text is a document being searched:semantic_similarity- For semantic similarity tasks:classification- For classification tasks:clustering- For clustering tasks:question_answering- For Q&A tasks:fact_verification- For fact verification:code_retrieval_query- For code retrieval
:title: Optional title (only for:retrieval_documenttask type):output_dimensionality: Optional dimension reduction
API-Specific Behavior
For Gemini API (gemini-embedding-001):
- Task type is passed as
taskTypeparameter - Default dimensions: 3072 (supports MRL: 768, 1536, 3072)
- Dimensions below 3072 need manual normalization
For Vertex AI (embeddinggemma):
- Task type is embedded as prompt prefix in the text
- Default dimensions: 768 (supports MRL: 128, 256, 512, 768)
- All dimensions are pre-normalized
Examples
# Simple embedding (auto-detects model)
{:ok, response} = Coordinator.embed_content("What is the meaning of life?")
{:ok, values} = EmbedContentResponse.get_values(response)
# With task type (works with both APIs transparently)
{:ok, response} = Coordinator.embed_content(
"This is a document about AI",
task_type: :retrieval_document,
title: "AI Overview"
)
# With explicit dimensionality
{:ok, response} = Coordinator.embed_content(
"Query text",
task_type: :retrieval_query,
output_dimensionality: 768
)
@spec extract_function_calls( Gemini.Types.Response.GenerateContentResponse.t() | map() ) :: [ Altar.ADM.FunctionCall.t() ]
Extract function calls from a GenerateContentResponse.
Returns a list of Altar.ADM.FunctionCall structs if the response contains
function calls, or an empty list if none are found.
Examples
{:ok, response} = Coordinator.generate_content("What's the weather?", tools: tools)
case Coordinator.extract_function_calls(response) do
[] ->
# No function calls, extract text normally
{:ok, text} = Coordinator.extract_text(response)
calls ->
# Execute function calls and continue conversation
results = Executor.execute_all(calls, registry)
end
@spec extract_text(Gemini.Types.Response.GenerateContentResponse.t()) :: {:ok, String.t()} | {:error, term()}
Extract text content from a GenerateContentResponse.
Examples
{:ok, response} = Coordinator.generate_content("Hello")
{:ok, text} = Coordinator.extract_text(response)
@spec generate_content( String.t() | [Gemini.Types.Content.t()] | Gemini.Types.Request.GenerateContentRequest.t(), Gemini.options() ) :: api_result(Gemini.Types.Response.GenerateContentResponse.t())
Generate content using the specified model and input.
See Gemini.options/0 for available options.
Parameters
input: String prompt or GenerateContentRequest structopts: Options including model, auth strategy, and generation config
Examples
# Simple text generation
{:ok, response} = Coordinator.generate_content("What is AI?")
# With specific model and auth
{:ok, response} = Coordinator.generate_content(
"Explain quantum computing",
model: Gemini.Config.get_model(:flash_lite_latest),
auth: :vertex_ai,
temperature: 0.7
)
# Using request struct
request = %GenerateContentRequest{...}
{:ok, response} = Coordinator.generate_content(request)
@spec get_batch_embeddings(Gemini.Types.Response.EmbedContentBatch.t()) :: api_result([Gemini.Types.Response.ContentEmbedding.t()])
Retrieve embeddings from a completed batch job.
Only works for batches in :completed state with inline responses.
For file-based outputs, use file download APIs.
Parameters
batch: Completed EmbedContentBatch
Returns
{:ok, embeddings}- List of ContentEmbedding results{:error, reason}- Batch not complete or file-based
Examples
{:ok, batch} = Coordinator.get_batch_status("batches/abc123")
if batch.state == :completed do
{:ok, embeddings} = Coordinator.get_batch_embeddings(batch)
IO.puts("Retrieved #{length(embeddings)} embeddings")
end
@spec get_batch_status(String.t(), Gemini.options()) :: api_result(Gemini.Types.Response.EmbedContentBatch.t())
Get the current status of an async batch embedding job.
Polls the batch status to check progress, completion, or failures.
See Gemini.options/0 for available options.
Parameters
batch_name: Batch identifier (format: "batches/{batchId}")opts: Optional auth and other options
Returns
{:ok, batch}- Current batch status with stats{:error, reason}- Failed to retrieve status
Examples
{:ok, batch} = Coordinator.get_batch_status("batches/abc123")
IO.puts("State: #{batch.state}")
if batch.batch_stats do
completed = batch.batch_stats.successful_request_count + batch.batch_stats.failed_request_count
total = batch.batch_stats.request_count
IO.puts("Progress: #{completed}/#{total}")
end
@spec get_model(String.t(), Gemini.options()) :: api_result(map())
Get information about a specific model.
See Gemini.options/0 for available options.
Parameters
model_name: Name of the model to retrieveopts: Options including auth strategy
Examples
{:ok, model} = Coordinator.get_model(Gemini.Config.get_model(:flash_lite_latest))
{:ok, model} = Coordinator.get_model("gemini-2.5-pro", auth: :vertex_ai)
@spec has_function_calls?(Gemini.Types.Response.GenerateContentResponse.t() | map()) :: boolean()
Check if a response contains function calls.
Examples
{:ok, response} = Coordinator.generate_content("Calculate 2+2", tools: tools)
if Coordinator.has_function_calls?(response) do
calls = Coordinator.extract_function_calls(response)
# Handle function calls
else
{:ok, text} = Coordinator.extract_text(response)
end
@spec list_models(Gemini.options()) :: api_result(Gemini.Types.Response.ListModelsResponse.t())
List available models for the specified authentication strategy.
See Gemini.options/0 for available options.
Parameters
opts: Options including auth strategy and pagination
Options
:auth: Authentication strategy (:geminior:vertex_ai):page_size: Number of models per page:page_token: Pagination token for next page
Examples
# List models with default auth
{:ok, models_response} = Coordinator.list_models()
# List models with specific auth strategy
{:ok, models_response} = Coordinator.list_models(auth: :vertex_ai)
# With pagination
{:ok, models_response} = Coordinator.list_models(
auth: :gemini,
page_size: 50,
page_token: "next_page_token"
)
Stop a streaming content generation.
@spec stream_generate_content( String.t() | Gemini.Types.Request.GenerateContentRequest.t(), Gemini.options() ) :: api_result(String.t())
Stream content generation with real-time response chunks.
See Gemini.options/0 for available options.
Parameters
input: String prompt or GenerateContentRequest structopts: Options including model, auth strategy, and generation config
Returns
{:ok, stream_id}: Stream started successfully{:error, reason}: Failed to start stream
After starting the stream, subscribe to receive events:
{:ok, stream_id} = Coordinator.stream_generate_content("Tell me a story")
:ok = Coordinator.subscribe_stream(stream_id)
# Handle incoming messages
receive do
{:stream_event, ^stream_id, event} ->
IO.inspect(event, label: "Stream Event")
{:stream_complete, ^stream_id} ->
IO.puts("Stream completed")
{:stream_error, ^stream_id, stream_error} ->
IO.puts("Stream error: #{inspect(stream_error)}")
endExamples
# Basic streaming
{:ok, stream_id} = Coordinator.stream_generate_content("Write a poem")
# With specific configuration
{:ok, stream_id} = Coordinator.stream_generate_content(
"Explain machine learning",
model: Gemini.Config.get_model(:flash_lite_latest),
auth: :gemini,
temperature: 0.8,
max_output_tokens: 1000
)
Get the status of a streaming content generation.
Subscribe to a streaming content generation.
Parameters
stream_id: ID of the stream to subscribe tosubscriber_pid: Process to receive stream events (defaults to current process)
Examples
{:ok, stream_id} = Coordinator.stream_generate_content("Hello")
:ok = Coordinator.subscribe_stream(stream_id)
# In a different process
:ok = Coordinator.subscribe_stream(stream_id, target_pid)
Unsubscribe from a streaming content generation.