ReqLLM.StreamServer (ReqLLM v1.0.0)

View Source

GenServer that manages streaming LLM sessions with backpressure and SSE parsing.

StreamServer acts as a bridge between HTTP streaming clients (like FinchClient) and consumers, providing:

  • SSE event parsing across HTTP chunk boundaries
  • Token queuing with configurable backpressure
  • Provider-agnostic event decoding via provider callbacks
  • Completion detection and metadata extraction
  • Clean error handling and resource cleanup

Architecture

The StreamServer receives HTTP events via synchronous GenServer.call/2, which enables natural backpressure - if the consumer queue is full, HTTP events are delayed until the queue drains. This prevents memory issues from fast producers overwhelming slow consumers.

Usage

# Start a streaming session
{:ok, server} = StreamServer.start_link(
  provider_mod: ReqLLM.Providers.OpenAI,
  model: %ReqLLM.Model{...}
)

# Attach HTTP task for monitoring
StreamServer.attach_http_task(server, http_task_pid)

# Consumer loop
case StreamServer.next(server) do
  {:ok, chunk} -> handle_chunk(chunk)
  :halt -> handle_completion()
  {:error, reason} -> handle_error(reason)
end

State Management

The server maintains state for:

  • provider_mod: Provider module for event decoding
  • model: ReqLLM.Model struct for provider context
  • provider_state: Optional provider-specific state for stateful transformations
  • sse_buffer: Binary buffer for SSE parsing across chunks
  • queue: Token chunks awaiting consumer retrieval
  • status: Current session status (:init, :streaming, :done, {:error, reason})
  • http_task: HTTP task reference for monitoring
  • consumer_refs: Set of consumer process references
  • fixture_path: Optional path for fixture capture
  • metadata: Final metadata when streaming completes
  • high_watermark: Queue size limit for backpressure (default 500)

Backpressure

When the internal queue exceeds high_watermark, the server delays replying to {:http_event, {:data, _}} messages until consumers drain the queue via next/2. This provides natural backpressure without dropping events.

Summary

Functions

Attach an HTTP task to the server for monitoring.

Block until metadata is available from the completed stream.

Cancel the streaming session and cleanup resources.

Returns a specification to start this module under a supervisor.

Forward an HTTP event to the server for processing.

Get the next chunk from the stream with optional timeout.

Set HTTP context and canonical JSON for fixture capture.

Start a StreamServer with the given options.

Types

server()

@type server() :: GenServer.server()

status()

@type status() :: :init | :streaming | :done | {:error, any()}

Functions

attach_http_task(server, task_pid)

@spec attach_http_task(server(), pid()) :: :ok

Attach an HTTP task to the server for monitoring.

The server will monitor the task and handle cleanup if it crashes.

Parameters

  • server - StreamServer process
  • task_pid - HTTP task process ID

Examples

task = Task.async(fn -> Finch.stream(...) end)
ReqLLM.StreamServer.attach_http_task(server, task.pid)

await_metadata(server, timeout \\ 30000)

@spec await_metadata(server(), non_neg_integer()) :: {:ok, map()} | {:error, any()}

Block until metadata is available from the completed stream.

Parameters

  • server - StreamServer process
  • timeout - Maximum time to wait in milliseconds (default: 30_000)

Returns

  • {:ok, metadata} - Final stream metadata
  • {:error, reason} - Error occurred or timeout

Examples

case ReqLLM.StreamServer.await_metadata(server, 10_000) do
  {:ok, metadata} ->
    IO.puts("Tokens used: " <> inspect(metadata[:usage][:total_tokens]))
  {:error, :timeout} ->
    IO.puts("Metadata not available yet")
end

cancel(server)

@spec cancel(server()) :: :ok

Cancel the streaming session and cleanup resources.

Stops the HTTP task if running and terminates the server.

Parameters

  • server - StreamServer process

Examples

ReqLLM.StreamServer.cancel(server)

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

http_event(server, event)

@spec http_event(server(), term()) :: :ok

Forward an HTTP event to the server for processing.

This is the primary interface for HTTP clients to deliver streaming events. Provides backpressure through synchronous GenServer.call.

Parameters

  • server - StreamServer process
  • event - HTTP event tuple: {:status, integer()}, {:headers, list()},
          `{:data, binary()}`, `:done`, or `{:error, term()}`

Examples

ReqLLM.StreamServer.http_event(server, {:status, 200})
ReqLLM.StreamServer.http_event(server, {:headers, [{"content-type", "text/event-stream"}]})
ReqLLM.StreamServer.http_event(server, {:data, "data: {...}\n\n"})
ReqLLM.StreamServer.http_event(server, :done)

next(server, timeout \\ 30000)

@spec next(server(), non_neg_integer()) ::
  {:ok, ReqLLM.StreamChunk.t()} | :halt | {:error, any()}

Get the next chunk from the stream with optional timeout.

Blocks until a chunk is available or the stream completes/errors. Returns :halt when the stream is complete.

Parameters

  • server - StreamServer process
  • timeout - Maximum time to wait in milliseconds (default: 30_000)

Returns

  • {:ok, chunk} - Next StreamChunk
  • :halt - Stream is complete
  • {:error, reason} - Error occurred

Examples

case ReqLLM.StreamServer.next(server) do
  {:ok, %ReqLLM.StreamChunk{type: :content, text: text}} ->
    IO.write(text)
    next(server)

  :halt ->
    :ok

  {:error, reason} ->
    Logger.error("Stream error: " <> inspect(reason))
end

set_fixture_context(server, http_context, canonical_json)

@spec set_fixture_context(server(), ReqLLM.Streaming.Fixtures.HTTPContext.t(), any()) ::
  :ok

Set HTTP context and canonical JSON for fixture capture.

This is called by the streaming pipeline to provide the HTTP metadata and request data needed for fixture capture.

Parameters

  • server - StreamServer process
  • http_context - HTTPContext struct with request/response metadata
  • canonical_json - The request body as JSON for fixture saving

Examples

ReqLLM.StreamServer.set_fixture_context(server, http_context, request_json)

start_http(server, provider_mod, model, context, opts, finch_name \\ ReqLLM.Finch)

@spec start_http(
  server(),
  module(),
  ReqLLM.Model.t(),
  ReqLLM.Context.t(),
  keyword(),
  atom()
) ::
  {:ok, pid(), any(), any()} | {:error, term()}

Start HTTP streaming from within the StreamServer.

This method ensures proper lifecycle coupling by having the StreamServer own and link to the HTTP streaming task. When the server exits, the task automatically terminates, preventing orphaned callbacks.

Parameters

  • server - StreamServer process
  • provider_mod - Provider module (e.g., ReqLLM.Providers.OpenAI)
  • model - ReqLLM.Model struct
  • context - ReqLLM.Context with messages to stream
  • opts - Additional options for the request
  • finch_name - Finch process name (default: ReqLLM.Finch)

Returns

  • {:ok, task_pid, http_context, canonical_json} - Successfully started
  • {:error, reason} - Failed to start

Examples

{:ok, _task_pid, _http_context, _canonical_json} =
  StreamServer.start_http(
    server,
    ReqLLM.Providers.OpenAI,
    model,
    context,
    opts
  )

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start a StreamServer with the given options.

Options

  • :provider_mod - Provider module implementing ReqLLM.Provider behavior (required)
  • :model - ReqLLM.Model struct (required)
  • :fixture_path - Optional path for fixture capture
  • :high_watermark - Queue size limit for backpressure (default: 500)

Examples

{:ok, server} = ReqLLM.StreamServer.start_link(
  provider_mod: ReqLLM.Providers.OpenAI,
  model: %ReqLLM.Model{provider: :openai, name: "gpt-4o"}
)