Gemini.Streaming.ToolOrchestrator (GeminiEx v0.3.0)

View Source

GenServer responsible for managing a single, stateful, automatic tool-calling stream.

This orchestrator handles the complex multi-stage streaming process:

  1. Starts the initial streaming HTTP request to the Gemini API
  2. Buffers and inspects incoming chunks for function calls
  3. When function calls are detected, stops the first stream and executes tools
  4. Starts a second streaming request with the complete history including tool results
  5. Proxies the final stream events to the original subscriber

The orchestrator maintains state throughout this process and handles errors gracefully.

Summary

Functions

Returns a specification to start this module under a supervisor.

Start a new tool orchestrator for automatic streaming.

Stop the orchestrator and all associated streams.

Subscribe an additional process to receive stream events.

Types

orchestrator_state()

@type orchestrator_state() :: %{
  stream_id: String.t(),
  subscriber_pid: pid(),
  chat: Gemini.Chat.t(),
  auth_strategy: :gemini | :vertex_ai,
  config: keyword(),
  phase: :awaiting_model_call | :executing_tools | :awaiting_final_response,
  first_stream_pid: pid() | nil,
  second_stream_pid: pid() | nil,
  buffered_chunks: [map()],
  turn_limit: non_neg_integer(),
  error: term() | nil
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

start_link(stream_id, subscriber_pid, chat, auth_strategy, config)

@spec start_link(String.t(), pid(), Gemini.Chat.t(), :gemini | :vertex_ai, keyword()) ::
  GenServer.on_start()

Start a new tool orchestrator for automatic streaming.

Parameters

  • stream_id: Unique identifier for this stream
  • subscriber_pid: Process to receive final stream events
  • chat: Initial chat state with history and options
  • auth_strategy: Authentication strategy to use
  • config: Additional configuration options

Returns

  • {:ok, pid()}: Orchestrator started successfully
  • {:error, reason}: Failed to start orchestrator

stop(orchestrator_pid)

@spec stop(pid()) :: :ok

Stop the orchestrator and all associated streams.

subscribe(orchestrator_pid, subscriber_pid)

@spec subscribe(pid(), pid()) :: :ok

Subscribe an additional process to receive stream events.