ReqLLM.Streaming (ReqLLM v1.0.0)
View SourceMain orchestration for ReqLLM streaming operations.
This module coordinates StreamServer, FinchClient, and StreamResponse to provide a cohesive streaming system. It serves as the entry point for all streaming operations and handles the complex coordination between components.
Architecture
The streaming system consists of three main components:
StreamServer- GenServer managing stream state and event processingFinchClient- HTTP transport layer using Finch for streaming requestsStreamResponse- User-facing API providing streams and metadata tasks
Flow
start_stream/4creates StreamServer with provider configuration- FinchClient builds provider-specific HTTP request and starts streaming
- HTTP task is attached to StreamServer for monitoring and cleanup
- StreamResponse provides lazy stream using
Stream.resource/3 - Metadata task runs concurrently to collect usage and finish_reason
- Cancel function provides cleanup of all components
Example
{:ok, stream_response} = ReqLLM.Streaming.start_stream(
ReqLLM.Providers.Anthropic,
%ReqLLM.Model{provider: :anthropic, name: "claude-3-sonnet"},
ReqLLM.Context.new("Hello!"),
[]
)
# Stream tokens
stream_response.stream
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()
# Get metadata
usage = ReqLLM.StreamResponse.usage(stream_response)
Summary
Functions
Start a streaming session with coordinated StreamServer, FinchClient, and StreamResponse.
Functions
@spec start_stream(module(), ReqLLM.Model.t(), ReqLLM.Context.t(), keyword()) :: {:ok, ReqLLM.StreamResponse.t()} | {:error, term()}
Start a streaming session with coordinated StreamServer, FinchClient, and StreamResponse.
This is the main entry point for streaming operations. It orchestrates all components to provide a cohesive streaming experience with concurrent metadata collection.
Parameters
provider_mod- Provider module (e.g.,ReqLLM.Providers.Anthropic)model- Model configuration structcontext- Conversation context with messagesopts- Additional options (timeout, fixture_path, etc.)
Returns
{:ok, stream_response}- StreamResponse with stream and metadata_task{:error, reason}- Failed to start streaming components
Options
:timeout- HTTP request timeout in milliseconds (default: 30_000):metadata_timeout- Metadata collection timeout in milliseconds (default: 300_000):fixture_path- Path for test fixture capture (testing only):finch_name- Finch pool name (default: ReqLLM.Finch)
Examples
# Basic streaming
{:ok, stream_response} = ReqLLM.Streaming.start_stream(
ReqLLM.Providers.Anthropic,
model,
context,
[]
)
# With options
{:ok, stream_response} = ReqLLM.Streaming.start_stream(
provider_mod,
model,
context,
timeout: 60_000,
fixture_path: "/tmp/test_fixture.json"
)Error Cases
The function can fail at several points:
- StreamServer fails to start
- Provider's build_stream_request/4 fails
- HTTP streaming task fails to start
- Task attachment fails
All failures return {:error, reason} with descriptive error information.