ReqLLM.StreamServer (ReqLLM v1.0.0)
View SourceGenServer that manages streaming LLM sessions with backpressure and SSE parsing.
StreamServer acts as a bridge between HTTP streaming clients (like FinchClient) and consumers, providing:
- SSE event parsing across HTTP chunk boundaries
- Token queuing with configurable backpressure
- Provider-agnostic event decoding via provider callbacks
- Completion detection and metadata extraction
- Clean error handling and resource cleanup
Architecture
The StreamServer receives HTTP events via synchronous GenServer.call/2, which enables natural backpressure - if the consumer queue is full, HTTP events are delayed until the queue drains. This prevents memory issues from fast producers overwhelming slow consumers.
Usage
# Start a streaming session
{:ok, server} = StreamServer.start_link(
provider_mod: ReqLLM.Providers.OpenAI,
model: %ReqLLM.Model{...}
)
# Attach HTTP task for monitoring
StreamServer.attach_http_task(server, http_task_pid)
# Consumer loop
case StreamServer.next(server) do
{:ok, chunk} -> handle_chunk(chunk)
:halt -> handle_completion()
{:error, reason} -> handle_error(reason)
endState Management
The server maintains state for:
provider_mod: Provider module for event decodingmodel: ReqLLM.Model struct for provider contextprovider_state: Optional provider-specific state for stateful transformationssse_buffer: Binary buffer for SSE parsing across chunksqueue: Token chunks awaiting consumer retrievalstatus: Current session status (:init,:streaming,:done,{:error, reason})http_task: HTTP task reference for monitoringconsumer_refs: Set of consumer process referencesfixture_path: Optional path for fixture capturemetadata: Final metadata when streaming completeshigh_watermark: Queue size limit for backpressure (default 500)
Backpressure
When the internal queue exceeds high_watermark, the server delays replying to
{:http_event, {:data, _}} messages until consumers drain the queue via next/2.
This provides natural backpressure without dropping events.
Summary
Functions
Attach an HTTP task to the server for monitoring.
Block until metadata is available from the completed stream.
Cancel the streaming session and cleanup resources.
Returns a specification to start this module under a supervisor.
Forward an HTTP event to the server for processing.
Get the next chunk from the stream with optional timeout.
Set HTTP context and canonical JSON for fixture capture.
Start HTTP streaming from within the StreamServer.
Start a StreamServer with the given options.
Types
@type server() :: GenServer.server()
@type status() :: :init | :streaming | :done | {:error, any()}
Functions
Attach an HTTP task to the server for monitoring.
The server will monitor the task and handle cleanup if it crashes.
Parameters
server- StreamServer processtask_pid- HTTP task process ID
Examples
task = Task.async(fn -> Finch.stream(...) end)
ReqLLM.StreamServer.attach_http_task(server, task.pid)
@spec await_metadata(server(), non_neg_integer()) :: {:ok, map()} | {:error, any()}
Block until metadata is available from the completed stream.
Parameters
server- StreamServer processtimeout- Maximum time to wait in milliseconds (default: 30_000)
Returns
{:ok, metadata}- Final stream metadata{:error, reason}- Error occurred or timeout
Examples
case ReqLLM.StreamServer.await_metadata(server, 10_000) do
{:ok, metadata} ->
IO.puts("Tokens used: " <> inspect(metadata[:usage][:total_tokens]))
{:error, :timeout} ->
IO.puts("Metadata not available yet")
end
@spec cancel(server()) :: :ok
Cancel the streaming session and cleanup resources.
Stops the HTTP task if running and terminates the server.
Parameters
server- StreamServer process
Examples
ReqLLM.StreamServer.cancel(server)
Returns a specification to start this module under a supervisor.
See Supervisor.
Forward an HTTP event to the server for processing.
This is the primary interface for HTTP clients to deliver streaming events. Provides backpressure through synchronous GenServer.call.
Parameters
server- StreamServer processevent- HTTP event tuple:{:status, integer()},{:headers, list()},`{:data, binary()}`, `:done`, or `{:error, term()}`
Examples
ReqLLM.StreamServer.http_event(server, {:status, 200})
ReqLLM.StreamServer.http_event(server, {:headers, [{"content-type", "text/event-stream"}]})
ReqLLM.StreamServer.http_event(server, {:data, "data: {...}\n\n"})
ReqLLM.StreamServer.http_event(server, :done)
@spec next(server(), non_neg_integer()) :: {:ok, ReqLLM.StreamChunk.t()} | :halt | {:error, any()}
Get the next chunk from the stream with optional timeout.
Blocks until a chunk is available or the stream completes/errors.
Returns :halt when the stream is complete.
Parameters
server- StreamServer processtimeout- Maximum time to wait in milliseconds (default: 30_000)
Returns
{:ok, chunk}- Next StreamChunk:halt- Stream is complete{:error, reason}- Error occurred
Examples
case ReqLLM.StreamServer.next(server) do
{:ok, %ReqLLM.StreamChunk{type: :content, text: text}} ->
IO.write(text)
next(server)
:halt ->
:ok
{:error, reason} ->
Logger.error("Stream error: " <> inspect(reason))
end
@spec set_fixture_context(server(), ReqLLM.Streaming.Fixtures.HTTPContext.t(), any()) :: :ok
Set HTTP context and canonical JSON for fixture capture.
This is called by the streaming pipeline to provide the HTTP metadata and request data needed for fixture capture.
Parameters
server- StreamServer processhttp_context- HTTPContext struct with request/response metadatacanonical_json- The request body as JSON for fixture saving
Examples
ReqLLM.StreamServer.set_fixture_context(server, http_context, request_json)
@spec start_http( server(), module(), ReqLLM.Model.t(), ReqLLM.Context.t(), keyword(), atom() ) :: {:ok, pid(), any(), any()} | {:error, term()}
Start HTTP streaming from within the StreamServer.
This method ensures proper lifecycle coupling by having the StreamServer own and link to the HTTP streaming task. When the server exits, the task automatically terminates, preventing orphaned callbacks.
Parameters
server- StreamServer processprovider_mod- Provider module (e.g., ReqLLM.Providers.OpenAI)model- ReqLLM.Model structcontext- ReqLLM.Context with messages to streamopts- Additional options for the requestfinch_name- Finch process name (default: ReqLLM.Finch)
Returns
{:ok, task_pid, http_context, canonical_json}- Successfully started{:error, reason}- Failed to start
Examples
{:ok, _task_pid, _http_context, _canonical_json} =
StreamServer.start_http(
server,
ReqLLM.Providers.OpenAI,
model,
context,
opts
)
@spec start_link(keyword()) :: GenServer.on_start()
Start a StreamServer with the given options.
Options
:provider_mod- Provider module implementing ReqLLM.Provider behavior (required):model- ReqLLM.Model struct (required):fixture_path- Optional path for fixture capture:high_watermark- Queue size limit for backpressure (default: 500)
Examples
{:ok, server} = ReqLLM.StreamServer.start_link(
provider_mod: ReqLLM.Providers.OpenAI,
model: %ReqLLM.Model{provider: :openai, name: "gpt-4o"}
)