ReqLLM.StreamResponse (ReqLLM v1.3.0)

View Source

A streaming response container that provides both real-time streaming and asynchronous metadata.

StreamResponse is the new return type for streaming operations in ReqLLM, designed to provide efficient access to streaming data while maintaining backward compatibility with the legacy Response format.

Structure

  • stream - Lazy enumerable of ReqLLM.StreamChunk structs for real-time consumption
  • metadata_handle - Concurrent handle for metadata collection (usage, finish_reason)
  • cancel - Function to terminate streaming and cleanup resources
  • model - Model specification that generated this response
  • context - Conversation context for multi-turn workflows

Usage Patterns

Real-time streaming

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Tell a story")

stream_response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()

Collecting complete text

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")

text = ReqLLM.StreamResponse.text(stream_response)
usage = ReqLLM.StreamResponse.usage(stream_response)

Backward compatibility

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")
{:ok, legacy_response} = ReqLLM.StreamResponse.to_response(stream_response)

# Now works with existing Response-based code
text = ReqLLM.Response.text(legacy_response)

Early cancellation

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Long story...")

stream_response.stream
|> Stream.take(5)  # Take only first 5 chunks
|> Stream.each(&IO.write/1)
|> Stream.run()

# Cancel remaining work
stream_response.cancel.()

Design Philosophy

This struct separates concerns between streaming data (available immediately) and metadata (available after completion). This allows for:

  • Zero-latency streaming of content
  • Concurrent metadata processing
  • Resource cleanup via cancellation
  • Seamless backward compatibility

Summary

Types

Result of classifying a streaming response.

t()

A streaming response with concurrent metadata processing.

Functions

Classify a streaming response for tool-calling workflows.

Collect all tool calls from the stream into a list.

Await the metadata task and return the finish reason.

Process a stream with real-time callbacks for content and thinking.

Collect all text tokens into a single binary string.

Convert a StreamResponse to a legacy Response struct for backward compatibility.

Extract text tokens from the stream, filtering out metadata chunks.

Extract tool call chunks from the stream.

Await the metadata task and return usage statistics.

Types

classify_result()

@type classify_result() :: %{
  type: :tool_calls | :final_answer,
  text: String.t(),
  thinking: String.t(),
  tool_calls: [map()],
  finish_reason: atom() | nil
}

Result of classifying a streaming response.

  • type - :tool_calls if the model requested tool execution, :final_answer otherwise
  • text - Accumulated text content from the response
  • thinking - Accumulated thinking/reasoning content (if any)
  • tool_calls - List of complete tool calls with parsed arguments
  • finish_reason - The normalized finish reason (:stop, :tool_calls, :length, etc.)

t()

@type t() :: %ReqLLM.StreamResponse{
  cancel: (-> :ok),
  context: ReqLLM.Context.t(),
  metadata_handle: ReqLLM.StreamResponse.MetadataHandle.t(),
  model: LLMDB.Model.t(),
  stream: Enumerable.t()
}

A streaming response with concurrent metadata processing.

Contains a stream of chunks, a handle for metadata collection, cancellation function, and contextual information for multi-turn conversations.

Functions

classify(stream_response)

@spec classify(t()) :: classify_result()

Classify a streaming response for tool-calling workflows.

Consumes the stream and returns a structured result indicating whether the model wants to call tools or has provided a final answer. This is the recommended API for ReAct agents and function-calling applications.

Parameters

  • stream_response - The StreamResponse struct

Returns

A map with:

  • type - :tool_calls if the model requested tool execution, :final_answer otherwise
  • text - Accumulated text content from the response
  • thinking - Accumulated thinking/reasoning content
  • tool_calls - List of complete tool calls with parsed arguments
  • finish_reason - The normalized finish reason

Examples

{:ok, stream_response} = ReqLLM.stream_text(model, messages, tools: tools)

case ReqLLM.StreamResponse.classify(stream_response) do
  %{type: :tool_calls, tool_calls: calls} ->
    # Execute tools and continue conversation
    results = Enum.map(calls, &execute_tool/1)
    # Build tool result messages and continue...

  %{type: :final_answer, text: answer} ->
    # Done - return answer to user
    {:ok, answer}
end

Classification Logic

The classification uses multiple signals:

  1. If tool_calls is non-empty, type is :tool_calls
  2. If finish_reason is :tool_calls, type is :tool_calls
  3. Otherwise, type is :final_answer

extract_tool_calls(stream_response)

@spec extract_tool_calls(t()) :: [map()]

Collect all tool calls from the stream into a list.

Consumes the stream chunks and extracts all tool call information into a structured format suitable for execution.

Parameters

  • stream_response - The StreamResponse struct

Returns

A list of maps with tool call details including :id, :name, and :arguments.

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Call calculator")

tool_calls = ReqLLM.StreamResponse.extract_tool_calls(stream_response)
#=> [%{id: "call_123", name: "calculator", arguments: %{"operation" => "add", "a" => 2, "b" => 3}}]

finish_reason(stream_response)

@spec finish_reason(t()) :: atom() | nil

Await the metadata task and return the finish reason.

Blocks until the metadata collection task completes and returns the finish reason indicating why the generation stopped.

Parameters

  • stream_response - The StreamResponse struct

Returns

An atom indicating the finish reason (:stop, :length, :tool_use, etc.) or nil.

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")

reason = ReqLLM.StreamResponse.finish_reason(stream_response)
#=> :stop

Timeout

This function will block until metadata collection completes. The timeout is determined by the provider's streaming implementation.

process_stream(stream_response, opts \\ [])

@spec process_stream(
  t(),
  keyword()
) :: {:ok, ReqLLM.Response.t()} | {:error, term()}

Process a stream with real-time callbacks for content and thinking.

Unlike to_response/1, this function processes the stream incrementally and invokes callbacks as chunks arrive, enabling real-time streaming to UIs or other consumers. After processing completes, returns a complete Response struct with all accumulated data and metadata, including reconstructed tool calls.

Parameters

  • stream_response - The StreamResponse struct to process
  • opts - Keyword list of options:
    • :on_result - Callback invoked immediately for each :content chunk.
                 Signature: `(String.t() -> any())`
    • :on_thinking - Callback invoked immediately for each :thinking chunk.
                  Signature: `(String.t() -> any())`

Returns

{:ok, Response.t()} with complete response data including:

  • All accumulated text content in message.content
  • All accumulated thinking content in message.content
  • All reconstructed tool calls in message.tool_calls
  • Usage metadata in usage
  • Finish reason in finish_reason

Returns {:error, reason} if stream processing or metadata collection fails.

Examples

# Stream text to console and get final response
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Tell a story")

{:ok, response} = ReqLLM.StreamResponse.process_stream(stream_response,
  on_result: &IO.write/1,
  on_thinking: fn thinking -> IO.puts("[Thinking: #{thinking}]") end
)

# Response contains all accumulated data
IO.inspect(response.message)  # Complete message with text and thinking
IO.inspect(response.usage)    # Token usage metadata

# Stream to Phoenix LiveView
{:ok, response} = ReqLLM.StreamResponse.process_stream(stream_response,
  on_result: fn text ->
    Phoenix.PubSub.broadcast!(MyApp.PubSub, "chat:#{id}", {:chunk, text})
  end
)

# Tool calls are available in the response
tool_calls = response.message.tool_calls || []
Enum.each(tool_calls, &execute_tool/1)

Implementation Notes

  • Content and thinking callbacks fire immediately as chunks arrive (real-time streaming)
  • Tool calls are reconstructed from stream chunks and available in the returned Response
  • The stream is consumed exactly once (no double-consumption bugs)
  • All callbacks are optional - omitted callbacks are simply not invoked
  • The returned Response struct contains all accumulated data plus metadata

text(stream_response)

@spec text(t()) :: String.t()

Collect all text tokens into a single binary string.

Consumes the entire stream to build the complete text response. This is a convenience function for cases where you want the full text but still benefit from streaming's concurrent metadata collection.

Parameters

  • stream_response - The StreamResponse struct

Returns

The complete text content as a binary string.

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")

text = ReqLLM.StreamResponse.text(stream_response)
#=> "Hello! How can I help you today?"

Performance

This function will consume the entire stream. If you need both streaming display and final text, consider splitting the stream with an intermediate collection step.

to_response(stream_response)

@spec to_response(t()) :: {:ok, ReqLLM.Response.t()} | {:error, term()}

Convert a StreamResponse to a legacy Response struct for backward compatibility.

Consumes the entire stream to build a complete Response struct that's compatible with existing ReqLLM.Response-based code. This function handles both stream consumption and metadata collection concurrently.

Parameters

  • stream_response - The StreamResponse struct to convert

Returns

  • {:ok, response} - Successfully converted Response struct
  • {:error, reason} - Stream consumption or metadata collection failed

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")
{:ok, response} = ReqLLM.StreamResponse.to_response(stream_response)

# Now compatible with existing Response-based code
text = ReqLLM.Response.text(response)
usage = ReqLLM.Response.usage(response)

Implementation Note

This function materializes the entire stream and awaits metadata collection, so it negates the streaming benefits. Use this only when backward compatibility is required.

tokens(stream_response)

@spec tokens(t()) :: Enumerable.t()

Extract text tokens from the stream, filtering out metadata chunks.

Returns a stream that yields only the text content from :content type chunks, suitable for real-time display or processing.

Parameters

  • stream_response - The StreamResponse struct

Returns

A lazy stream of text strings from content chunks.

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")

stream_response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()

tool_calls(stream_response)

@spec tool_calls(t()) :: Enumerable.t()

Extract tool call chunks from the stream.

Returns a stream that yields only :tool_call type chunks, suitable for processing function calls made by the assistant.

Parameters

  • stream_response - The StreamResponse struct

Returns

A lazy stream of tool call chunks.

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Call get_time tool")

stream_response
|> ReqLLM.StreamResponse.tool_calls()
|> Stream.each(fn tool_call -> IO.inspect(tool_call.name) end)
|> Stream.run()

usage(stream_response)

@spec usage(t()) :: map() | nil

Await the metadata task and return usage statistics.

Blocks until the metadata collection task completes and returns the usage map containing token counts and cost information.

Parameters

  • stream_response - The StreamResponse struct

Returns

A usage map with token counts and costs, or nil if no usage data available.

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")

usage = ReqLLM.StreamResponse.usage(stream_response)
#=> %{input_tokens: 8, output_tokens: 12, total_cost: 0.024}

Timeout

This function will block until metadata collection completes. The timeout is determined by the provider's streaming implementation.