A streaming LLM response that yields events as they arrive from the provider.
Returned by Omni.stream_text/3. The three most common consumption patterns:
Stream to UI and get the final response
{:ok, stream} = Omni.stream_text({:anthropic, "claude-sonnet-4-5-20250514"}, "Hello!")
{:ok, response} =
stream
|> StreamingResponse.on(:text_delta, fn %{delta: d} -> IO.write(d) end)
|> StreamingResponse.complete()Just the text chunks
{:ok, stream} = Omni.stream_text({:anthropic, "claude-sonnet-4-5-20250514"}, "Hello!")
stream
|> StreamingResponse.text_stream()
|> Enum.each(&IO.write/1)Full event control
StreamingResponse implements Enumerable, yielding {event_type, event_map, partial_response} tuples. The partial response is rebuilt after every event to
reflect the current state of the stream:
{:ok, stream} = Omni.stream_text({:anthropic, "claude-sonnet-4-5-20250514"}, "Hello!")
for {type, data, _partial} <- stream do
case type do
:text_delta -> IO.write(data.delta)
:tool_use_start -> IO.puts("Calling tool: #{data.name}")
:done -> IO.puts("\nDone! Stop reason: #{data.stop_reason}")
_ -> :ok
end
endEvent types
Content block lifecycle events follow a start → delta → end pattern:
{:text_start, %{index: 0}, %Response{}}
{:text_delta, %{index: 0, delta: "Hello"}, %Response{}}
{:text_end, %{index: 0, content: %Text{}}, %Response{}}
{:thinking_start, %{index: 0}, %Response{}}
{:thinking_delta, %{index: 0, delta: "..."}, %Response{}}
{:thinking_end, %{index: 0, content: %Thinking{}}, %Response{}}
{:tool_use_start, %{index: 1, id: "call_1", name: "weather"}, %Response{}}
{:tool_use_delta, %{index: 1, delta: "{\"city\":"}, %Response{}}
{:tool_use_end, %{index: 1, content: %ToolUse{}}, %Response{}}Tool results are emitted between rounds when tools are auto-executed. The third element is the completed response from the step that triggered tool execution (not a partial response):
{:tool_result, %ToolResult{}, %Response{}}Terminal events — every stream ends with exactly one of these:
{:done, %{stop_reason: :stop}, %Response{}}
{:error, reason, %Response{}}
Summary
Types
A consumer event emitted during enumeration.
An event type atom emitted during enumeration.
A streaming response wrapper.
Functions
Cancels the underlying async HTTP response.
Consumes the entire stream and returns the final %Response{}.
Registers a side-effect handler for events of the given type.
Returns a stream of text delta binaries.
Types
@type event() :: {event_type(), map(), Omni.Response.t()} | {:error, term(), Omni.Response.t()}
A consumer event emitted during enumeration.
@type event_type() ::
:text_start
| :text_delta
| :text_end
| :thinking_start
| :thinking_delta
| :thinking_end
| :tool_use_start
| :tool_use_delta
| :tool_use_end
| :tool_result
| :error
| :done
An event type atom emitted during enumeration.
@type t() :: %Omni.StreamingResponse{cancel: (-> :ok) | nil, stream: Enumerable.t()}
A streaming response wrapper.
Functions
@spec cancel(t()) :: :ok
Cancels the underlying async HTTP response.
@spec complete(t()) :: {:ok, Omni.Response.t()} | {:error, term()}
Consumes the entire stream and returns the final %Response{}.
This drives the stream to completion, triggering any handlers registered
with on/3 along the way. Returns {:ok, response} on success or
{:error, reason} if the stream terminated with an error.
@spec on(t(), event_type(), (map() -> any()) | (map(), Omni.Response.t() -> any())) :: t()
Registers a side-effect handler for events of the given type.
Returns a new StreamingResponse with the handler inserted into the pipeline.
Handlers fire during consumption (when complete/1 or any Enum function
drives the stream) and their return values are discarded.
Multiple handlers can be chained — all fire independently:
{:ok, response} =
stream
|> StreamingResponse.on(:text_delta, fn %{delta: d} -> IO.write(d) end)
|> StreamingResponse.on(:thinking_delta, fn %{delta: d} -> IO.write(d) end)
|> StreamingResponse.on(:done, fn %{stop_reason: r} -> IO.puts("\nStop: #{r}") end)
|> StreamingResponse.complete()Use an arity-2 callback to access the partial response (e.g. for progress tracking based on accumulated token usage):
StreamingResponse.on(stream, :text_delta, fn _event, partial ->
send(self(), {:tokens, partial.usage.output_tokens})
end)
@spec text_stream(t()) :: Enumerable.t()
Returns a stream of text delta binaries.
Filters the event stream to only :text_delta events and extracts the
delta string from each. Useful when you only need the text content:
stream
|> StreamingResponse.text_stream()
|> Enum.into("")