ReqLLM.StreamResponse (ReqLLM v1.3.0)
View SourceA streaming response container that provides both real-time streaming and asynchronous metadata.
StreamResponse is the new return type for streaming operations in ReqLLM, designed to provide
efficient access to streaming data while maintaining backward compatibility with the legacy
Response format.
Structure
stream- Lazy enumerable ofReqLLM.StreamChunkstructs for real-time consumptionmetadata_handle- Concurrent handle for metadata collection (usage, finish_reason)cancel- Function to terminate streaming and cleanup resourcesmodel- Model specification that generated this responsecontext- Conversation context for multi-turn workflows
Usage Patterns
Real-time streaming
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Tell a story")
stream_response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()Collecting complete text
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")
text = ReqLLM.StreamResponse.text(stream_response)
usage = ReqLLM.StreamResponse.usage(stream_response)Backward compatibility
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")
{:ok, legacy_response} = ReqLLM.StreamResponse.to_response(stream_response)
# Now works with existing Response-based code
text = ReqLLM.Response.text(legacy_response)Early cancellation
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Long story...")
stream_response.stream
|> Stream.take(5) # Take only first 5 chunks
|> Stream.each(&IO.write/1)
|> Stream.run()
# Cancel remaining work
stream_response.cancel.()Design Philosophy
This struct separates concerns between streaming data (available immediately) and metadata (available after completion). This allows for:
- Zero-latency streaming of content
- Concurrent metadata processing
- Resource cleanup via cancellation
- Seamless backward compatibility
Summary
Types
Result of classifying a streaming response.
A streaming response with concurrent metadata processing.
Functions
Classify a streaming response for tool-calling workflows.
Collect all tool calls from the stream into a list.
Await the metadata task and return the finish reason.
Process a stream with real-time callbacks for content and thinking.
Collect all text tokens into a single binary string.
Convert a StreamResponse to a legacy Response struct for backward compatibility.
Extract text tokens from the stream, filtering out metadata chunks.
Extract tool call chunks from the stream.
Await the metadata task and return usage statistics.
Types
@type classify_result() :: %{ type: :tool_calls | :final_answer, text: String.t(), thinking: String.t(), tool_calls: [map()], finish_reason: atom() | nil }
Result of classifying a streaming response.
type-:tool_callsif the model requested tool execution,:final_answerotherwisetext- Accumulated text content from the responsethinking- Accumulated thinking/reasoning content (if any)tool_calls- List of complete tool calls with parsed argumentsfinish_reason- The normalized finish reason (:stop,:tool_calls,:length, etc.)
@type t() :: %ReqLLM.StreamResponse{ cancel: (-> :ok), context: ReqLLM.Context.t(), metadata_handle: ReqLLM.StreamResponse.MetadataHandle.t(), model: LLMDB.Model.t(), stream: Enumerable.t() }
A streaming response with concurrent metadata processing.
Contains a stream of chunks, a handle for metadata collection, cancellation function, and contextual information for multi-turn conversations.
Functions
@spec classify(t()) :: classify_result()
Classify a streaming response for tool-calling workflows.
Consumes the stream and returns a structured result indicating whether the model wants to call tools or has provided a final answer. This is the recommended API for ReAct agents and function-calling applications.
Parameters
stream_response- The StreamResponse struct
Returns
A map with:
type-:tool_callsif the model requested tool execution,:final_answerotherwisetext- Accumulated text content from the responsethinking- Accumulated thinking/reasoning contenttool_calls- List of complete tool calls with parsed argumentsfinish_reason- The normalized finish reason
Examples
{:ok, stream_response} = ReqLLM.stream_text(model, messages, tools: tools)
case ReqLLM.StreamResponse.classify(stream_response) do
%{type: :tool_calls, tool_calls: calls} ->
# Execute tools and continue conversation
results = Enum.map(calls, &execute_tool/1)
# Build tool result messages and continue...
%{type: :final_answer, text: answer} ->
# Done - return answer to user
{:ok, answer}
endClassification Logic
The classification uses multiple signals:
- If
tool_callsis non-empty, type is:tool_calls - If
finish_reasonis:tool_calls, type is:tool_calls - Otherwise, type is
:final_answer
Collect all tool calls from the stream into a list.
Consumes the stream chunks and extracts all tool call information into a structured format suitable for execution.
Parameters
stream_response- The StreamResponse struct
Returns
A list of maps with tool call details including :id, :name, and :arguments.
Examples
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Call calculator")
tool_calls = ReqLLM.StreamResponse.extract_tool_calls(stream_response)
#=> [%{id: "call_123", name: "calculator", arguments: %{"operation" => "add", "a" => 2, "b" => 3}}]
Await the metadata task and return the finish reason.
Blocks until the metadata collection task completes and returns the finish reason indicating why the generation stopped.
Parameters
stream_response- The StreamResponse struct
Returns
An atom indicating the finish reason (:stop, :length, :tool_use, etc.) or nil.
Examples
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")
reason = ReqLLM.StreamResponse.finish_reason(stream_response)
#=> :stopTimeout
This function will block until metadata collection completes. The timeout is determined by the provider's streaming implementation.
@spec process_stream( t(), keyword() ) :: {:ok, ReqLLM.Response.t()} | {:error, term()}
Process a stream with real-time callbacks for content and thinking.
Unlike to_response/1, this function processes the stream incrementally and invokes
callbacks as chunks arrive, enabling real-time streaming to UIs or other consumers.
After processing completes, returns a complete Response struct with all accumulated
data and metadata, including reconstructed tool calls.
Parameters
stream_response- The StreamResponse struct to processopts- Keyword list of options::on_result- Callback invoked immediately for each:contentchunk.Signature: `(String.t() -> any())`:on_thinking- Callback invoked immediately for each:thinkingchunk.Signature: `(String.t() -> any())`
Returns
{:ok, Response.t()} with complete response data including:
- All accumulated text content in
message.content - All accumulated thinking content in
message.content - All reconstructed tool calls in
message.tool_calls - Usage metadata in
usage - Finish reason in
finish_reason
Returns {:error, reason} if stream processing or metadata collection fails.
Examples
# Stream text to console and get final response
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Tell a story")
{:ok, response} = ReqLLM.StreamResponse.process_stream(stream_response,
on_result: &IO.write/1,
on_thinking: fn thinking -> IO.puts("[Thinking: #{thinking}]") end
)
# Response contains all accumulated data
IO.inspect(response.message) # Complete message with text and thinking
IO.inspect(response.usage) # Token usage metadata
# Stream to Phoenix LiveView
{:ok, response} = ReqLLM.StreamResponse.process_stream(stream_response,
on_result: fn text ->
Phoenix.PubSub.broadcast!(MyApp.PubSub, "chat:#{id}", {:chunk, text})
end
)
# Tool calls are available in the response
tool_calls = response.message.tool_calls || []
Enum.each(tool_calls, &execute_tool/1)Implementation Notes
- Content and thinking callbacks fire immediately as chunks arrive (real-time streaming)
- Tool calls are reconstructed from stream chunks and available in the returned Response
- The stream is consumed exactly once (no double-consumption bugs)
- All callbacks are optional - omitted callbacks are simply not invoked
- The returned Response struct contains all accumulated data plus metadata
Collect all text tokens into a single binary string.
Consumes the entire stream to build the complete text response. This is a convenience function for cases where you want the full text but still benefit from streaming's concurrent metadata collection.
Parameters
stream_response- The StreamResponse struct
Returns
The complete text content as a binary string.
Examples
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")
text = ReqLLM.StreamResponse.text(stream_response)
#=> "Hello! How can I help you today?"Performance
This function will consume the entire stream. If you need both streaming display and final text, consider splitting the stream with an intermediate collection step.
@spec to_response(t()) :: {:ok, ReqLLM.Response.t()} | {:error, term()}
Convert a StreamResponse to a legacy Response struct for backward compatibility.
Consumes the entire stream to build a complete Response struct that's compatible with existing ReqLLM.Response-based code. This function handles both stream consumption and metadata collection concurrently.
Parameters
stream_response- The StreamResponse struct to convert
Returns
{:ok, response}- Successfully converted Response struct{:error, reason}- Stream consumption or metadata collection failed
Examples
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")
{:ok, response} = ReqLLM.StreamResponse.to_response(stream_response)
# Now compatible with existing Response-based code
text = ReqLLM.Response.text(response)
usage = ReqLLM.Response.usage(response)Implementation Note
This function materializes the entire stream and awaits metadata collection, so it negates the streaming benefits. Use this only when backward compatibility is required.
@spec tokens(t()) :: Enumerable.t()
Extract text tokens from the stream, filtering out metadata chunks.
Returns a stream that yields only the text content from :content type chunks,
suitable for real-time display or processing.
Parameters
stream_response- The StreamResponse struct
Returns
A lazy stream of text strings from content chunks.
Examples
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")
stream_response
|> ReqLLM.StreamResponse.tokens()
|> Stream.each(&IO.write/1)
|> Stream.run()
@spec tool_calls(t()) :: Enumerable.t()
Extract tool call chunks from the stream.
Returns a stream that yields only :tool_call type chunks, suitable for
processing function calls made by the assistant.
Parameters
stream_response- The StreamResponse struct
Returns
A lazy stream of tool call chunks.
Examples
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Call get_time tool")
stream_response
|> ReqLLM.StreamResponse.tool_calls()
|> Stream.each(fn tool_call -> IO.inspect(tool_call.name) end)
|> Stream.run()
Await the metadata task and return usage statistics.
Blocks until the metadata collection task completes and returns the usage map containing token counts and cost information.
Parameters
stream_response- The StreamResponse struct
Returns
A usage map with token counts and costs, or nil if no usage data available.
Examples
{:ok, stream_response} = ReqLLM.stream_text("anthropic:claude-3-sonnet", "Hello!")
usage = ReqLLM.StreamResponse.usage(stream_response)
#=> %{input_tokens: 8, output_tokens: 12, total_cost: 0.024}Timeout
This function will block until metadata collection completes. The timeout is determined by the provider's streaming implementation.