Streaming Migration Guide

View Source

This guide helps you migrate from the deprecated stream_text!/3 pattern to the new StreamResponse API introduced in ReqLLM's Finch-based streaming refactor.

Overview

ReqLLM's streaming implementation has been completely redesigned to use Finch directly instead of REQ, providing:

  • HTTP/2 multiplexing for concurrent streams
  • Asynchronous metadata collection (usage, finish_reason)
  • Production-grade connection pooling
  • Better error handling and resource cleanup
  • Unified API across all providers

Migration Patterns

Basic Streaming

Before (deprecated):

ReqLLM.stream_text!(model, "Tell me a story")
|> Stream.each(&IO.write/1)
|> Stream.run()

After (recommended):

{:ok, response} = ReqLLM.stream_text(model, "Tell me a story")
response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()

Error Handling

Before (raised exceptions):

try do
  ReqLLM.stream_text!(model, messages)
  |> Stream.each(&IO.write/1)
  |> Stream.run()
rescue
  error -> handle_error(error)
end

After (proper error tuples):

case ReqLLM.stream_text(model, messages) do
  {:ok, response} ->
    response
    |> ReqLLM.StreamResponse.tokens()
    |> Stream.each(&IO.write/1)
    |> Stream.run()

  {:error, reason} ->
    handle_error(reason)
end

Getting Usage Metadata

Before (not available during streaming):

# Usage was only available after completion via separate call
ReqLLM.stream_text!(model, messages) |> Stream.run()
# No way to get usage metadata for the stream

After (concurrent metadata collection):

{:ok, response} = ReqLLM.stream_text(model, messages)

# Stream tokens in real-time
tokens_task = Task.start(fn ->
  response
  |> ReqLLM.StreamResponse.tokens()
  |> Stream.each(&IO.write/1)
  |> Stream.run()
end)

# Collect metadata concurrently
usage = ReqLLM.StreamResponse.usage(response)
finish_reason = ReqLLM.StreamResponse.finish_reason(response)

IO.puts("\\nUsage: #{inspect(usage)}")
IO.puts("Finish reason: #{finish_reason}")

Simplified Text Collection

Before (manual accumulation):

text =
  ReqLLM.stream_text!(model, messages)
  |> Enum.join("")

After (built-in helper):

{:ok, response} = ReqLLM.stream_text(model, messages)
text = ReqLLM.StreamResponse.text(response)

LiveView Integration

Before:

def handle_info({:stream_text, model, messages}, socket) do
  # No good way to handle this with the old API
  {:noreply, socket}
end

After:

def handle_info({:stream_text, model, messages}, socket) do
  case ReqLLM.stream_text(model, messages) do
    {:ok, response} ->
      # Stream tokens to the client
      Task.start(fn ->
        response
        |> ReqLLM.StreamResponse.tokens()
        |> Stream.each(&send(self(), {:token, &1}))
        |> Stream.run()
      end)

      # Handle metadata when available
      Task.start(fn ->
        usage = ReqLLM.StreamResponse.usage(response)
        send(self(), {:usage, usage})
      end)

      {:noreply, socket}

    {:error, reason} ->
      {:noreply, put_flash(socket, :error, "Stream failed: #{inspect(reason)}")}
  end
end

def handle_info({:token, token}, socket) do
  {:noreply, push_event(socket, "token", %{text: token})}
end

def handle_info({:usage, usage}, socket) do
  {:noreply, push_event(socket, "usage", usage)}
end

Backward Compatibility

If you need to migrate gradually, you can convert StreamResponse to the legacy Response format:

{:ok, stream_response} = ReqLLM.stream_text(model, messages)
{:ok, legacy_response} = ReqLLM.StreamResponse.to_response(stream_response)

# Now compatible with existing Response-based code
text = ReqLLM.Response.text(legacy_response)
usage = ReqLLM.Response.usage(legacy_response)

Note: This conversion negates the streaming benefits since it materializes the entire stream.

New Features

Cancellation Support

{:ok, response} = ReqLLM.stream_text(model, "Very long story...")

# Start streaming
task = Task.async(fn ->
  response
  |> ReqLLM.StreamResponse.tokens()
  |> Stream.take(10)  # Only take first 10 tokens
  |> Enum.to_list()
end)

tokens = Task.await(task)

# Cancel remaining work to free resources
response.cancel.()

Resource Management

The new system automatically manages resources:

{:ok, response} = ReqLLM.stream_text(model, messages)

# Resources are cleaned up automatically when stream completes
response
|> ReqLLM.StreamResponse.tokens()
|> Stream.run()

# Or manually if needed
response.cancel.()

Connection Pool Configuration

The new Finch-based system allows connection pool configuration:

# In config.exs
config :req_llm,
  finch: [
    name: ReqLLM.Finch,
    pools: %{
      :default => [protocols: [:http2, :http1], size: 1, count: 16]
    }
  ]

# Use custom Finch instance per request
{:ok, response} = ReqLLM.stream_text(model, messages, finch_name: MyApp.Finch)

Common Migration Issues

Issue: Stream chunks have different structure

Problem: The old API returned chunks with .text field, new API returns raw text tokens.

Solution: Use ReqLLM.StreamResponse.tokens() to get text-only stream:

# Old: chunk.text
# New: direct text tokens
response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)  # &1 is already text

Issue: No access to raw StreamChunk structs

Problem: Sometimes you need the full chunk structure, not just text.

Solution: Access the raw stream directly:

{:ok, response} = ReqLLM.stream_text(model, messages)

# Raw StreamChunk structs
response.stream
|> Stream.each(fn chunk ->
  case chunk.type do
    :content -> IO.write(chunk.text)
    :tool_call -> handle_tool_call(chunk)
    _ -> :ignore
  end
end)
|> Stream.run()

Issue: Concurrent access to usage metadata

Problem: Multiple parts of code need usage metadata without blocking.

Solution: Share the metadata task:

{:ok, response} = ReqLLM.stream_text(model, messages)

# Multiple consumers can await the same task
usage_task = response.metadata_task

Task.start(fn ->
  usage = Task.await(usage_task)
  log_usage(usage)
end)

Task.start(fn ->
  metadata = Task.await(usage_task)
  update_billing(metadata.usage)
end)

Testing Migration

Update your tests to expect the new return format:

Before:

test "streams text" do
  chunks = ReqLLM.stream_text!("anthropic:claude-3-sonnet", "Hello") |> Enum.take(5)
  assert Enum.all?(chunks, &is_binary/1)
end

After:

test "streams text" do
  {:ok, response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello")
  tokens = response |> ReqLLM.StreamResponse.tokens() |> Enum.take(5)
  assert Enum.all?(tokens, &is_binary/1)
end

Performance Considerations

The new streaming system provides significant performance improvements:

  1. HTTP/2 Multiplexing: Multiple concurrent streams over single connection
  2. Reduced Memory Usage: Lazy stream evaluation prevents buffering
  3. Concurrent Processing: Metadata collection doesn't block token streaming
  4. Connection Reuse: Finch pools reduce connection overhead

For high-throughput applications, consider tuning the connection pool:

config :req_llm,
  finch: [
    name: ReqLLM.Finch,
    pools: %{
      :default => [protocols: [:http2], size: 1, count: 32]
    }
  ]

Next Steps

  1. Update your code to use stream_text/3 instead of stream_text!/3
  2. Replace manual error handling with proper {:ok, response} pattern matching
  3. Use StreamResponse helper functions for common operations
  4. Configure connection pools for your deployment scale
  5. Test the migration thoroughly with your specific use cases

The deprecated stream_text!/3 function will be removed in a future major version. Please migrate at your earliest convenience.