ReqLLM.StreamResponse (ReqLLM v1.0.0-rc.5)

View Source

A streaming response container that provides both real-time streaming and asynchronous metadata.

StreamResponse is the new return type for streaming operations in ReqLLM, designed to provide efficient access to streaming data while maintaining backward compatibility with the legacy Response format.

Structure

  • stream - Lazy enumerable of ReqLLM.StreamChunk structs for real-time consumption
  • metadata_task - Concurrent Task for metadata collection (usage, finish_reason)
  • cancel - Function to terminate streaming and cleanup resources
  • model - Model specification that generated this response
  • context - Conversation context for multi-turn workflows

Usage Patterns

Real-time streaming

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Tell a story")

stream_response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()

Collecting complete text

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")

text = ReqLLM.StreamResponse.text(stream_response)
usage = ReqLLM.StreamResponse.usage(stream_response)

Backward compatibility

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")
{:ok, legacy_response} = ReqLLM.StreamResponse.to_response(stream_response)

# Now works with existing Response-based code
text = ReqLLM.Response.text(legacy_response)

Early cancellation

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Long story...")

stream_response.stream
|> Stream.take(5)  # Take only first 5 chunks
|> Stream.each(&IO.write/1)
|> Stream.run()

# Cancel remaining work
stream_response.cancel.()

Design Philosophy

This struct separates concerns between streaming data (available immediately) and metadata (available after completion). This allows for:

  • Zero-latency streaming of content
  • Concurrent metadata processing
  • Resource cleanup via cancellation
  • Seamless backward compatibility

Summary

Types

t()

A streaming response with concurrent metadata processing.

Functions

Collect all tool calls from the stream into a list.

Await the metadata task and return the finish reason.

Collect all text tokens into a single binary string.

Convert a StreamResponse to a legacy Response struct for backward compatibility.

Extract text tokens from the stream, filtering out metadata chunks.

Extract tool call chunks from the stream.

Await the metadata task and return usage statistics.

Types

t()

@type t() :: %ReqLLM.StreamResponse{
  cancel: (-> :ok),
  context: ReqLLM.Context.t(),
  metadata_task: Task.t(),
  model: ReqLLM.Model.t(),
  stream: Enumerable.t()
}

A streaming response with concurrent metadata processing.

Contains a stream of chunks, a task for metadata collection, cancellation function, and contextual information for multi-turn conversations.

Functions

extract_tool_calls(stream_response)

@spec extract_tool_calls(t()) :: [map()]

Collect all tool calls from the stream into a list.

Consumes the stream chunks and extracts all tool call information into a structured format suitable for execution.

Parameters

  • stream_response - The StreamResponse struct

Returns

A list of maps with tool call details including :id, :name, and :arguments.

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Call calculator")

tool_calls = ReqLLM.StreamResponse.extract_tool_calls(stream_response)
#=> [%{id: "call_123", name: "calculator", arguments: %{"operation" => "add", "a" => 2, "b" => 3}}]

finish_reason(stream_response)

@spec finish_reason(t()) :: atom() | nil

Await the metadata task and return the finish reason.

Blocks until the metadata collection task completes and returns the finish reason indicating why the generation stopped.

Parameters

  • stream_response - The StreamResponse struct

Returns

An atom indicating the finish reason (:stop, :length, :tool_use, etc.) or nil.

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")

reason = ReqLLM.StreamResponse.finish_reason(stream_response)
#=> :stop

Timeout

This function will block until metadata collection completes. The timeout is determined by the provider's streaming implementation.

text(stream_response)

@spec text(t()) :: String.t()

Collect all text tokens into a single binary string.

Consumes the entire stream to build the complete text response. This is a convenience function for cases where you want the full text but still benefit from streaming's concurrent metadata collection.

Parameters

  • stream_response - The StreamResponse struct

Returns

The complete text content as a binary string.

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")

text = ReqLLM.StreamResponse.text(stream_response)
#=> "Hello! How can I help you today?"

Performance

This function will consume the entire stream. If you need both streaming display and final text, consider using Stream.tee/2 to split the stream.

to_response(stream_response)

@spec to_response(t()) :: {:ok, ReqLLM.Response.t()} | {:error, term()}

Convert a StreamResponse to a legacy Response struct for backward compatibility.

Consumes the entire stream to build a complete Response struct that's compatible with existing ReqLLM.Response-based code. This function handles both stream consumption and metadata collection concurrently.

Parameters

  • stream_response - The StreamResponse struct to convert

Returns

  • {:ok, response} - Successfully converted Response struct
  • {:error, reason} - Stream consumption or metadata collection failed

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")
{:ok, response} = ReqLLM.StreamResponse.to_response(stream_response)

# Now compatible with existing Response-based code
text = ReqLLM.Response.text(response)
usage = ReqLLM.Response.usage(response)

Implementation Note

This function materializes the entire stream and awaits metadata collection, so it negates the streaming benefits. Use this only when backward compatibility is required.

tokens(stream_response)

@spec tokens(t()) :: Enumerable.t()

Extract text tokens from the stream, filtering out metadata chunks.

Returns a stream that yields only the text content from :content type chunks, suitable for real-time display or processing.

Parameters

  • stream_response - The StreamResponse struct

Returns

A lazy stream of text strings from content chunks.

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")

stream_response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()

tool_calls(stream_response)

@spec tool_calls(t()) :: Enumerable.t()

Extract tool call chunks from the stream.

Returns a stream that yields only :tool_call type chunks, suitable for processing function calls made by the assistant.

Parameters

  • stream_response - The StreamResponse struct

Returns

A lazy stream of tool call chunks.

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Call get_time tool")

stream_response
|> ReqLLM.StreamResponse.tool_calls()
|> Stream.each(fn tool_call -> IO.inspect(tool_call.name) end)
|> Stream.run()

usage(stream_response)

@spec usage(t()) :: map() | nil

Await the metadata task and return usage statistics.

Blocks until the metadata collection task completes and returns the usage map containing token counts and cost information.

Parameters

  • stream_response - The StreamResponse struct

Returns

A usage map with token counts and costs, or nil if no usage data available.

Examples

{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")

usage = ReqLLM.StreamResponse.usage(stream_response)
#=> %{input_tokens: 8, output_tokens: 12, total_cost: 0.024}

Timeout

This function will block until metadata collection completes. The timeout is determined by the provider's streaming implementation.