ReqLLM.Streaming (ReqLLM v1.0.0)

View Source

Main orchestration for ReqLLM streaming operations.

This module coordinates StreamServer, FinchClient, and StreamResponse to provide a cohesive streaming system. It serves as the entry point for all streaming operations and handles the complex coordination between components.

Architecture

The streaming system consists of three main components:

  • StreamServer - GenServer managing stream state and event processing
  • FinchClient - HTTP transport layer using Finch for streaming requests
  • StreamResponse - User-facing API providing streams and metadata tasks

Flow

  1. start_stream/4 creates StreamServer with provider configuration
  2. FinchClient builds provider-specific HTTP request and starts streaming
  3. HTTP task is attached to StreamServer for monitoring and cleanup
  4. StreamResponse provides lazy stream using Stream.resource/3
  5. Metadata task runs concurrently to collect usage and finish_reason
  6. Cancel function provides cleanup of all components

Example

{:ok, stream_response} = ReqLLM.Streaming.start_stream(
  ReqLLM.Providers.Anthropic,
  %ReqLLM.Model{provider: :anthropic, name: "claude-3-sonnet"}, 
  ReqLLM.Context.new("Hello!"),
  []
)

# Stream tokens
stream_response.stream
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()

# Get metadata
usage = ReqLLM.StreamResponse.usage(stream_response)

Summary

Functions

Start a streaming session with coordinated StreamServer, FinchClient, and StreamResponse.

Functions

start_stream(provider_mod, model, context, opts \\ [])

@spec start_stream(module(), ReqLLM.Model.t(), ReqLLM.Context.t(), keyword()) ::
  {:ok, ReqLLM.StreamResponse.t()} | {:error, term()}

Start a streaming session with coordinated StreamServer, FinchClient, and StreamResponse.

This is the main entry point for streaming operations. It orchestrates all components to provide a cohesive streaming experience with concurrent metadata collection.

Parameters

  • provider_mod - Provider module (e.g., ReqLLM.Providers.Anthropic)
  • model - Model configuration struct
  • context - Conversation context with messages
  • opts - Additional options (timeout, fixture_path, etc.)

Returns

  • {:ok, stream_response} - StreamResponse with stream and metadata_task
  • {:error, reason} - Failed to start streaming components

Options

  • :timeout - HTTP request timeout in milliseconds (default: 30_000)
  • :metadata_timeout - Metadata collection timeout in milliseconds (default: 300_000)
  • :fixture_path - Path for test fixture capture (testing only)
  • :finch_name - Finch pool name (default: ReqLLM.Finch)

Examples

# Basic streaming
{:ok, stream_response} = ReqLLM.Streaming.start_stream(
  ReqLLM.Providers.Anthropic,
  model,
  context,
  []
)

# With options
{:ok, stream_response} = ReqLLM.Streaming.start_stream(
  provider_mod,
  model, 
  context,
  timeout: 60_000,
  fixture_path: "/tmp/test_fixture.json"
)

Error Cases

The function can fail at several points:

  • StreamServer fails to start
  • Provider's build_stream_request/4 fails
  • HTTP streaming task fails to start
  • Task attachment fails

All failures return {:error, reason} with descriptive error information.