Unified streaming manager that supports multiple authentication strategies.
This is the canonical streaming manager for the Gemini client, supporting concurrent usage of both Gemini API and Vertex AI authentication strategies within the same application.
Features:
- HTTP streaming with real-time event delivery
- Multi-authentication strategy support via MultiAuthCoordinator
- Per-stream authentication strategy selection
- Concurrent usage of multiple auth strategies
- Resource management and cleanup
Summary
Functions
Returns a specification to start this module under a supervisor.
Get manager statistics.
Get stream information.
List all active streams.
Start the unified streaming manager.
Start a new stream.
Stop a stream.
Get the status of a stream.
Subscribe to a stream to receive events.
Subscribe to stream events.
Unsubscribe from a stream.
Types
@type auth_strategy() :: :gemini | :vertex_ai
@type manager_state() :: %{ streams: %{required(stream_id()) => stream_state()}, stream_counter: non_neg_integer(), max_streams: pos_integer(), default_timeout: pos_integer() }
@type stream_id() :: String.t()
@type stream_state() :: %{ stream_id: stream_id(), stream_pid: pid() | nil, model: String.t(), request_body: map(), status: :starting | :active | :completed | :error | :stopped, error: term() | nil, started_at: DateTime.t(), subscribers: [subscriber_ref()], events_count: non_neg_integer(), last_event_at: DateTime.t() | nil, config: keyword(), auth_strategy: auth_strategy(), release_fn: nil | (atom(), map() | nil -> :ok) }
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec get_stats() :: map()
Get manager statistics.
Get stream information.
@spec list_streams() :: [stream_id()]
List all active streams.
@spec start_link(keyword()) :: GenServer.on_start()
Start the unified streaming manager.
@spec start_stream(String.t(), map(), keyword()) :: {:ok, stream_id()} | {:error, term()}
@spec start_stream(term(), keyword(), pid()) :: {:ok, stream_id()} | {:error, term()}
Start a new stream.
API Variants
New API: start_stream(model, request_body, opts)
model: The model to use for generationrequest_body: The request body for content generationopts: Options including auth strategy and other config
Legacy API: start_stream(contents, opts, subscriber_pid)
contents: Content to stream (string or list of Content structs)opts: Generation options (model, generation_config, etc.)subscriber_pid: Process to receive stream events
Options
:auth: Authentication strategy (:geminior:vertex_ai):timeout: Request timeout in milliseconds- Other options passed to the streaming request
Examples
# New API with Gemini auth
{:ok, stream_id} = UnifiedManager.start_stream(
Gemini.Config.get_model(:flash_lite_latest),
%{contents: [%{parts: [%{text: "Hello"}]}]},
auth: :gemini
)
# Legacy API
{:ok, stream_id} = UnifiedManager.start_stream("Hello", [model: Gemini.Config.get_model(:flash_lite_latest)], self())
Stop a stream.
Get the status of a stream.
Subscribe to a stream to receive events.
Subscribe to stream events.
Unsubscribe from a stream.