Streaming Migration Guide
View SourceThis guide helps you migrate from the deprecated stream_text!/3 pattern to the new StreamResponse API introduced in ReqLLM's Finch-based streaming refactor.
Overview
ReqLLM's streaming implementation has been completely redesigned to use Finch directly instead of REQ, providing:
- HTTP/2 multiplexing for concurrent streams
- Asynchronous metadata collection (usage, finish_reason)
- Production-grade connection pooling
- Better error handling and resource cleanup
- Unified API across all providers
Migration Patterns
Basic Streaming
Before (deprecated):
ReqLLM.stream_text!(model, "Tell me a story")
|> Stream.each(&IO.write/1)
|> Stream.run()After (recommended):
{:ok, response} = ReqLLM.stream_text(model, "Tell me a story")
response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()Error Handling
Before (raised exceptions):
try do
ReqLLM.stream_text!(model, messages)
|> Stream.each(&IO.write/1)
|> Stream.run()
rescue
error -> handle_error(error)
endAfter (proper error tuples):
case ReqLLM.stream_text(model, messages) do
{:ok, response} ->
response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()
{:error, reason} ->
handle_error(reason)
endGetting Usage Metadata
Before (not available during streaming):
# Usage was only available after completion via separate call
ReqLLM.stream_text!(model, messages) |> Stream.run()
# No way to get usage metadata for the streamAfter (concurrent metadata collection):
{:ok, response} = ReqLLM.stream_text(model, messages)
# Stream tokens in real-time
tokens_task = Task.start(fn ->
response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()
end)
# Collect metadata concurrently
usage = ReqLLM.StreamResponse.usage(response)
finish_reason = ReqLLM.StreamResponse.finish_reason(response)
IO.puts("\\nUsage: #{inspect(usage)}")
IO.puts("Finish reason: #{finish_reason}")Simplified Text Collection
Before (manual accumulation):
text =
ReqLLM.stream_text!(model, messages)
|> Enum.join("")After (built-in helper):
{:ok, response} = ReqLLM.stream_text(model, messages)
text = ReqLLM.StreamResponse.text(response)LiveView Integration
Before:
def handle_info({:stream_text, model, messages}, socket) do
# No good way to handle this with the old API
{:noreply, socket}
endAfter:
def handle_info({:stream_text, model, messages}, socket) do
case ReqLLM.stream_text(model, messages) do
{:ok, response} ->
# Stream tokens to the client
Task.start(fn ->
response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&send(self(), {:token, &1}))
|> Stream.run()
end)
# Handle metadata when available
Task.start(fn ->
usage = ReqLLM.StreamResponse.usage(response)
send(self(), {:usage, usage})
end)
{:noreply, socket}
{:error, reason} ->
{:noreply, put_flash(socket, :error, "Stream failed: #{inspect(reason)}")}
end
end
def handle_info({:token, token}, socket) do
{:noreply, push_event(socket, "token", %{text: token})}
end
def handle_info({:usage, usage}, socket) do
{:noreply, push_event(socket, "usage", usage)}
endBackward Compatibility
If you need to migrate gradually, you can convert StreamResponse to the legacy Response format:
{:ok, stream_response} = ReqLLM.stream_text(model, messages)
{:ok, legacy_response} = ReqLLM.StreamResponse.to_response(stream_response)
# Now compatible with existing Response-based code
text = ReqLLM.Response.text(legacy_response)
usage = ReqLLM.Response.usage(legacy_response)Note: This conversion negates the streaming benefits since it materializes the entire stream.
New Features
Cancellation Support
{:ok, response} = ReqLLM.stream_text(model, "Very long story...")
# Start streaming
task = Task.async(fn ->
response
|> ReqLLM.StreamResponse.tokens()
|> Stream.take(10) # Only take first 10 tokens
|> Enum.to_list()
end)
tokens = Task.await(task)
# Cancel remaining work to free resources
response.cancel.()Resource Management
The new system automatically manages resources:
{:ok, response} = ReqLLM.stream_text(model, messages)
# Resources are cleaned up automatically when stream completes
response
|> ReqLLM.StreamResponse.tokens()
|> Stream.run()
# Or manually if needed
response.cancel.()Connection Pool Configuration
The new Finch-based system allows connection pool configuration:
# In config.exs
config :req_llm,
finch: [
name: ReqLLM.Finch,
pools: %{
:default => [protocols: [:http2, :http1], size: 1, count: 16]
}
]
# Use custom Finch instance per request
{:ok, response} = ReqLLM.stream_text(model, messages, finch_name: MyApp.Finch)Common Migration Issues
Issue: Stream chunks have different structure
Problem: The old API returned chunks with .text field, new API returns raw text tokens.
Solution: Use ReqLLM.StreamResponse.tokens() to get text-only stream:
# Old: chunk.text
# New: direct text tokens
response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1) # &1 is already textIssue: No access to raw StreamChunk structs
Problem: Sometimes you need the full chunk structure, not just text.
Solution: Access the raw stream directly:
{:ok, response} = ReqLLM.stream_text(model, messages)
# Raw StreamChunk structs
response.stream
|> Stream.each(fn chunk ->
case chunk.type do
:content -> IO.write(chunk.text)
:tool_call -> handle_tool_call(chunk)
_ -> :ignore
end
end)
|> Stream.run()Issue: Concurrent access to usage metadata
Problem: Multiple parts of code need usage metadata without blocking.
Solution: Share the metadata task:
{:ok, response} = ReqLLM.stream_text(model, messages)
# Multiple consumers can await the same task
usage_task = response.metadata_task
Task.start(fn ->
usage = Task.await(usage_task)
log_usage(usage)
end)
Task.start(fn ->
metadata = Task.await(usage_task)
update_billing(metadata.usage)
end)Testing Migration
Update your tests to expect the new return format:
Before:
test "streams text" do
chunks = ReqLLM.stream_text!("anthropic:claude-3-sonnet", "Hello") |> Enum.take(5)
assert Enum.all?(chunks, &is_binary/1)
endAfter:
test "streams text" do
{:ok, response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello")
tokens = response |> ReqLLM.StreamResponse.tokens() |> Enum.take(5)
assert Enum.all?(tokens, &is_binary/1)
endPerformance Considerations
The new streaming system provides significant performance improvements:
- HTTP/2 Multiplexing: Multiple concurrent streams over single connection
- Reduced Memory Usage: Lazy stream evaluation prevents buffering
- Concurrent Processing: Metadata collection doesn't block token streaming
- Connection Reuse: Finch pools reduce connection overhead
For high-throughput applications, consider tuning the connection pool:
config :req_llm,
finch: [
name: ReqLLM.Finch,
pools: %{
:default => [protocols: [:http2], size: 1, count: 32]
}
]Next Steps
- Update your code to use
stream_text/3instead ofstream_text!/3 - Replace manual error handling with proper
{:ok, response}pattern matching - Use
StreamResponsehelper functions for common operations - Configure connection pools for your deployment scale
- Test the migration thoroughly with your specific use cases
The deprecated stream_text!/3 function will be removed in a future major version. Please migrate at your earliest convenience.