Gemini Streaming Architecture
View SourceThis document provides a comprehensive overview of the streaming architecture, identifies the current concurrency issue, and proposes solutions for true real-time streaming.
Current Architecture Overview
graph TD
%% Client Layer
CLI[CLI User] --> Demo[streaming_demo.exs]
Demo --> |start_stream| GeminiAPI[Gemini API Module]
%% Core Streaming Components
GeminiAPI --> |start_stream| Manager[ManagerV2 GenServer]
Manager --> |stream_to_process| HTTPClient[HTTPStreaming Client]
HTTPClient --> |Req.request| ReqLib[Req HTTP Library]
ReqLib --> |SSE Connection| API[Gemini API Endpoint]
%% Response Processing Chain
API --> |SSE Response| ReqLib
ReqLib --> |Complete Body| HTTPClient
HTTPClient --> |parse_chunk| Parser[SSE Parser]
Parser --> |Events| HTTPClient
HTTPClient --> |Callback| Manager
Manager --> |notify_subscribers| Demo
Demo --> |Text Output| CLI
%% Detailed Internal Flow
subgraph S1 [" "]
direction TB
ST1[HTTP Streaming Process]
HTTPClient --> |spawn process| StreamProcess[Stream Process]
StreamProcess --> |do_stream| DoStream[Execute HTTP Request]
DoStream --> |Response Body| ParseBody[Parse Complete Body]
ParseBody --> |send messages| Manager
end
subgraph S2 [" "]
direction TB
ST2[SSE Parser State]
Parser --> Buffer[String Buffer]
Parser --> Events[Parsed Events]
Buffer --> |extract_events| EventList[Event List]
EventList --> |parse_event| JSONParse[JSON Parsing]
end
subgraph S3 [" "]
direction TB
ST3[Manager State]
Manager --> StreamMap[Stream State Map]
Manager --> Subscribers[Subscriber List]
StreamMap --> StreamState[Individual Stream State]
StreamState --> Status[Status: starting/active/complete]
end
%% Styling
classDef primary fill:#6B46C1,stroke:#4C1D95,stroke-width:3px,color:#FFFFFF
classDef secondary fill:#9333EA,stroke:#6B21A8,stroke-width:2px,color:#FFFFFF
classDef tertiary fill:#A855F7,stroke:#7C2D12,stroke-width:2px,color:#FFFFFF
classDef api fill:#EF4444,stroke:#B91C1C,stroke-width:2px,color:#FFFFFF
classDef subscriber fill:#10B981,stroke:#047857,stroke-width:2px,color:#FFFFFF
classDef subgraphTitle fill:#1E1B4B,stroke:#312E81,stroke-width:2px,color:#FFFFFF
classDef problem fill:#DC2626,stroke:#991B1B,stroke-width:3px,color:#FFFFFF
classDef solution fill:#059669,stroke:#047857,stroke-width:2px,color:#FFFFFF
classDef normal fill:#3B82F6,stroke:#1D4ED8,stroke-width:2px,color:#FFFFFF
%% Apply classes
class CLI,Demo primary
class GeminiAPI,Manager primary
class HTTPClient,ReqLib secondary
class Parser,Buffer,Events,EventList,JSONParse tertiary
class API api
class StreamProcess normal
class DoStream,ParseBody problem
class StreamMap,Subscribers,StreamState,Status solution
class ST1,ST2,ST3 subgraphTitle
%% Subgraph styling
style S1 fill:#F9FAFB,stroke:#9333EA,stroke-width:3px
style S2 fill:#FEFEFE,stroke:#A855F7,stroke-width:3px
style S3 fill:#F3F4F6,stroke:#059669,stroke-width:3px
The Core Concurrency Problem
Issue Identification
The current implementation has a fundamental concurrency flaw that prevents true real-time streaming:
- HTTP Request Blocking: The
Req.request()
call waits for the complete response body before returning - Batch Event Processing: All SSE events are parsed after the HTTP response completes
- Delayed Notification: Events are sent to subscribers only after all data is received
Current Flow (Problematic)
sequenceDiagram
participant CLI
participant Manager
participant HTTPClient
participant Req
participant API
CLI->>Manager: start_stream("prompt")
Manager->>HTTPClient: stream_to_process(url, headers, body)
HTTPClient->>+Req: request(opts)
Note over Req,API: HTTP Connection Established
API->>Req: data: {"text": "Hello"}
API->>Req: data: {"text": " world"}
API->>Req: data: {"text": "!"}
API->>Req: Connection closes
Req->>-HTTPClient: {:ok, %Response{body: "complete_sse_data"}}
HTTPClient->>HTTPClient: parse_chunk(complete_body)
HTTPClient->>Manager: callback(event1)
HTTPClient->>Manager: callback(event2)
HTTPClient->>Manager: callback(event3)
Manager->>CLI: stream_event(event1)
Manager->>CLI: stream_event(event2)
Manager->>CLI: stream_event(event3)
Manager->>CLI: stream_complete
Note over CLI: All text appears at once!
Solution: True Streaming Architecture
Proposed Real-Time Flow
sequenceDiagram
participant CLI
participant Manager
participant HTTPClient
participant StreamingReq[Streaming Req]
participant API
CLI->>Manager: start_stream("prompt")
Manager->>HTTPClient: stream_to_process(url, headers, body)
HTTPClient->>+StreamingReq: request with streaming callback
Note over StreamingReq,API: HTTP Connection Established
API->>StreamingReq: data: {"text": "Hello"}
StreamingReq->>HTTPClient: handle_chunk("data: {\"text\": \"Hello\"}")
HTTPClient->>Manager: callback(event1)
Manager->>CLI: stream_event(event1)
Note over CLI: "Hello" appears immediately
API->>StreamingReq: data: {"text": " world"}
StreamingReq->>HTTPClient: handle_chunk("data: {\"text\": \" world\"}")
HTTPClient->>Manager: callback(event2)
Manager->>CLI: stream_event(event2)
Note over CLI: " world" appears immediately
API->>StreamingReq: data: {"text": "!"}
StreamingReq->>HTTPClient: handle_chunk("data: {\"text\": \"!\"}")
HTTPClient->>Manager: callback(event3)
Manager->>CLI: stream_event(event3)
Note over CLI: "!" appears immediately
API->>StreamingReq: Connection closes
StreamingReq->>HTTPClient: handle_done()
HTTPClient->>Manager: completion_callback()
Manager->>CLI: stream_complete
Required Implementation Changes
1. Fix HTTP Streaming Layer
Current Implementation (Broken):
# lib/gemini/client/http_streaming.ex - CURRENT
case Req.request(req_opts) do
{:ok, %Req.Response{status: status, body: body}} when status in 200..299 ->
# Problem: body contains COMPLETE response
case Parser.parse_chunk(body, parser) do
{:ok, events, _final_parser} ->
# All events processed at once!
Required Implementation (Real-Time):
# lib/gemini/client/http_streaming.ex - FIXED
req_opts = [
method: :post,
url: url,
headers: add_sse_headers(headers),
json: body,
receive_timeout: timeout,
# KEY CHANGE: Stream chunks as they arrive
into: fn
{:status, status}, acc ->
%{acc | status: status}
{:headers, headers}, acc ->
%{acc | headers: headers}
{:data, chunk}, acc ->
# Process each chunk immediately!
handle_streaming_chunk(chunk, acc, callback)
:done, acc ->
finalize_stream(acc, callback)
end
]
2. Implement Chunk-by-Chunk Processing
defp handle_streaming_chunk(chunk, %{parser: parser} = acc, callback) do
case Parser.parse_chunk(chunk, parser) do
{:ok, events, new_parser} ->
# Send events immediately as they're parsed
Enum.each(events, fn event ->
stream_event = %{type: :data, data: event.data, error: nil}
callback.(stream_event)
end)
%{acc | parser: new_parser}
{:error, error} ->
error_event = %{type: :error, data: nil, error: error}
callback.(error_event)
acc
end
end
3. Enhanced SSE Parser for Streaming
The current parser already supports stateful chunk processing, but we need to ensure it handles partial events correctly:
# lib/gemini/sse/parser.ex - Verify this works correctly
defp extract_events(data) do
# Handle both \r\n\r\n and \n\n patterns (FIXED)
parts = String.split(data, ~r/\r?\n\r?\n/)
case parts do
[] -> {[], ""}
[single_part] -> {[], single_part} # No complete events
multiple_parts ->
{complete_events, [remaining]} = Enum.split(multiple_parts, -1)
filtered_events = Enum.filter(complete_events, &(&1 != ""))
trimmed_remaining = String.trim(remaining)
{filtered_events, trimmed_remaining}
end
end
4. Manager State Improvements
Ensure the manager can handle rapid event processing:
# lib/gemini/streaming/manager_v2.ex
def handle_info({:stream_event, stream_id, event}, state) do
case Map.get(state.streams, stream_id) do
nil ->
Logger.warning("Received event for unknown stream: #{stream_id}")
{:noreply, state}
stream_state ->
# Update stream state
updated_stream = %{
stream_state
| events_count: stream_state.events_count + 1,
last_event_at: DateTime.utc_now(),
status: :active
}
# Notify subscribers IMMEDIATELY - no batching
notify_subscribers_immediately(updated_stream.subscribers, {:stream_event, stream_id, event})
new_state = put_in(state.streams[stream_id], updated_stream)
{:noreply, new_state}
end
end
# Ensure immediate notification
defp notify_subscribers_immediately(subscribers, message) do
Enum.each(subscribers, fn {pid, _ref} ->
send(pid, message) # Immediate send, no batching
end)
end
Testing the Fix
1. Update Demo to Show Timing
# streaming_demo.exs - Add timing information
defp listen_for_events do
start_time = System.monotonic_time(:millisecond)
receive do
{:stream_event, _stream_id, %{type: :data, data: data}} ->
current_time = System.monotonic_time(:millisecond)
elapsed = current_time - start_time
text_content = extract_text_from_stream_data(data)
if text_content && text_content != "" do
IO.write("[#{elapsed}ms] #{text_content}") # Show timing
end
listen_for_events()
{:stream_complete, _stream_id} ->
IO.puts("\n\n✅ Stream completed!")
after
30_000 ->
IO.puts("\n⏰ Stream timeout after 30 seconds")
end
end
2. Performance Verification
# Test script to verify real-time behavior
defmodule StreamingTest do
def test_realtime_performance do
start_time = System.monotonic_time(:millisecond)
event_times = []
{:ok, stream_id} = Gemini.start_stream("Count slowly from 1 to 10")
:ok = Gemini.subscribe_stream(stream_id)
receive_loop(start_time, event_times)
end
defp receive_loop(start_time, event_times) do
receive do
{:stream_event, _, %{type: :data}} ->
event_time = System.monotonic_time(:millisecond) - start_time
new_times = [event_time | event_times]
receive_loop(start_time, new_times)
{:stream_complete, _} ->
final_times = Enum.reverse(event_times)
analyze_timing(final_times)
end
end
defp analyze_timing(times) do
intervals = times |> Enum.chunk_every(2, 1, :discard) |> Enum.map(fn [a, b] -> b - a end)
IO.puts("Event timing analysis:")
IO.puts("First event: #{hd(times)}ms")
IO.puts("Average interval: #{Enum.sum(intervals) / length(intervals)}ms")
IO.puts("All events: #{inspect(times)}")
end
end
Expected Behavior After Fix
- Immediate First Response: First token should appear within 500-1000ms
- Progressive Display: Each chunk should appear as it's generated, not all at once
- Low Latency: Gap between API generation and CLI display should be <50ms
- No Buffering: Text should stream continuously, not in large blocks
The current issue where "all text dumps out at the end" should be completely resolved with this streaming implementation.