Gemini.Streaming.UnifiedManager (GeminiEx v0.10.0)

Copy Markdown View Source

Unified streaming manager that supports multiple authentication strategies.

This is the canonical streaming manager for the Gemini client, supporting concurrent usage of both Gemini API and Vertex AI authentication strategies within the same application.

Features:

  • HTTP streaming with real-time event delivery
  • Multi-authentication strategy support via MultiAuthCoordinator
  • Per-stream authentication strategy selection
  • Concurrent usage of multiple auth strategies
  • Resource management and cleanup

Summary

Functions

Returns a specification to start this module under a supervisor.

Get manager statistics.

Get stream information.

List all active streams.

Start the unified streaming manager.

Stop a stream.

Get the status of a stream.

Subscribe to a stream to receive events.

Types

auth_strategy()

@type auth_strategy() :: :gemini | :vertex_ai

manager_state()

@type manager_state() :: %{
  streams: %{required(stream_id()) => stream_state()},
  stream_counter: non_neg_integer(),
  max_streams: pos_integer(),
  default_timeout: pos_integer()
}

stream_id()

@type stream_id() :: String.t()

stream_state()

@type stream_state() :: %{
  stream_id: stream_id(),
  stream_pid: pid() | nil,
  model: String.t(),
  request_body: map(),
  status: :starting | :active | :completed | :error | :stopped,
  error: term() | nil,
  started_at: DateTime.t(),
  subscribers: [subscriber_ref()],
  events_count: non_neg_integer(),
  last_event_at: DateTime.t() | nil,
  config: keyword(),
  auth_strategy: auth_strategy(),
  release_fn: nil | (atom(), map() | nil -> :ok)
}

subscriber_ref()

@type subscriber_ref() :: {pid(), reference()}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_stats()

@spec get_stats() :: map()

Get manager statistics.

get_stream_info(stream_id)

@spec get_stream_info(stream_id()) :: {:ok, map()} | {:error, term()}

Get stream information.

list_streams()

@spec list_streams() :: [stream_id()]

List all active streams.

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

Start the unified streaming manager.

start_stream(model, request_body, opts \\ [])

@spec start_stream(String.t(), map(), keyword()) ::
  {:ok, stream_id()} | {:error, term()}
@spec start_stream(term(), keyword(), pid()) :: {:ok, stream_id()} | {:error, term()}

Start a new stream.

API Variants

New API: start_stream(model, request_body, opts)

  • model: The model to use for generation
  • request_body: The request body for content generation
  • opts: Options including auth strategy and other config

Legacy API: start_stream(contents, opts, subscriber_pid)

  • contents: Content to stream (string or list of Content structs)
  • opts: Generation options (model, generation_config, etc.)
  • subscriber_pid: Process to receive stream events

Options

  • :auth: Authentication strategy (:gemini or :vertex_ai)
  • :timeout: Request timeout in milliseconds
  • Other options passed to the streaming request

Examples

# New API with Gemini auth
{:ok, stream_id} = UnifiedManager.start_stream(
  Gemini.Config.get_model(:flash_lite_latest),
  %{contents: [%{parts: [%{text: "Hello"}]}]},
  auth: :gemini
)

# Legacy API
{:ok, stream_id} = UnifiedManager.start_stream("Hello", [model: Gemini.Config.get_model(:flash_lite_latest)], self())

stop_stream(stream_id)

@spec stop_stream(stream_id()) :: :ok | {:error, term()}

Stop a stream.

stream_status(stream_id)

@spec stream_status(stream_id()) :: {:ok, atom()} | {:error, term()}

Get the status of a stream.

subscribe(stream_id, subscriber_pid \\ self())

@spec subscribe(stream_id(), pid()) :: :ok | {:error, term()}

Subscribe to a stream to receive events.

subscribe_stream(stream_id, subscriber_pid \\ self())

@spec subscribe_stream(stream_id(), pid()) :: :ok | {:error, term()}

Subscribe to stream events.

unsubscribe(stream_id, subscriber_pid \\ self())

@spec unsubscribe(stream_id(), pid()) :: :ok | {:error, term()}

Unsubscribe from a stream.