Omni.StreamingResponse (Omni v1.2.1)

Copy Markdown View Source

A streaming LLM response that yields events as they arrive from the provider.

Returned by Omni.stream_text/3. The three most common consumption patterns:

Stream to UI and get the final response

{:ok, stream} = Omni.stream_text({:anthropic, "claude-sonnet-4-5-20250514"}, "Hello!")

{:ok, response} =
  stream
  |> StreamingResponse.on(:text_delta, fn %{delta: d} -> IO.write(d) end)
  |> StreamingResponse.complete()

Just the text chunks

{:ok, stream} = Omni.stream_text({:anthropic, "claude-sonnet-4-5-20250514"}, "Hello!")

stream
|> StreamingResponse.text_stream()
|> Enum.each(&IO.write/1)

Full event control

StreamingResponse implements Enumerable, yielding {event_type, event_map, partial_response} tuples. The partial response is rebuilt after every event to reflect the current state of the stream:

{:ok, stream} = Omni.stream_text({:anthropic, "claude-sonnet-4-5-20250514"}, "Hello!")

for {type, data, _partial} <- stream do
  case type do
    :text_delta -> IO.write(data.delta)
    :tool_use_start -> IO.puts("Calling tool: #{data.name}")
    :done -> IO.puts("\nDone! Stop reason: #{data.stop_reason}")
    _ -> :ok
  end
end

Event types

Content block lifecycle events follow a startdeltaend pattern:

{:text_start,     %{index: 0}, %Response{}}
{:text_delta,     %{index: 0, delta: "Hello"}, %Response{}}
{:text_end,       %{index: 0, content: %Text{}}, %Response{}}

{:thinking_start, %{index: 0}, %Response{}}
{:thinking_delta, %{index: 0, delta: "..."}, %Response{}}
{:thinking_end,   %{index: 0, content: %Thinking{}}, %Response{}}

{:tool_use_start, %{index: 1, id: "call_1", name: "weather"}, %Response{}}
{:tool_use_delta, %{index: 1, delta: "{\"city\":"}, %Response{}}
{:tool_use_end,   %{index: 1, content: %ToolUse{}}, %Response{}}

Tool results are emitted between rounds when tools are auto-executed. The third element is the completed response from the step that triggered tool execution (not a partial response):

{:tool_result, %ToolResult{}, %Response{}}

Terminal events — every stream ends with exactly one of these:

{:done,  %{stop_reason: :stop}, %Response{}}
{:error, reason, %Response{}}

Summary

Types

A consumer event emitted during enumeration.

An event type atom emitted during enumeration.

t()

A streaming response wrapper.

Functions

Cancels the underlying async HTTP response.

Consumes the entire stream and returns the final %Response{}.

Registers a side-effect handler for events of the given type.

Returns a stream of text delta binaries.

Types

event()

@type event() ::
  {event_type(), map(), Omni.Response.t()} | {:error, term(), Omni.Response.t()}

A consumer event emitted during enumeration.

event_type()

@type event_type() ::
  :text_start
  | :text_delta
  | :text_end
  | :thinking_start
  | :thinking_delta
  | :thinking_end
  | :tool_use_start
  | :tool_use_delta
  | :tool_use_end
  | :tool_result
  | :error
  | :done

An event type atom emitted during enumeration.

t()

@type t() :: %Omni.StreamingResponse{cancel: (-> :ok) | nil, stream: Enumerable.t()}

A streaming response wrapper.

Functions

cancel(streaming_response)

@spec cancel(t()) :: :ok

Cancels the underlying async HTTP response.

complete(sr)

@spec complete(t()) :: {:ok, Omni.Response.t()} | {:error, term()}

Consumes the entire stream and returns the final %Response{}.

This drives the stream to completion, triggering any handlers registered with on/3 along the way. Returns {:ok, response} on success or {:error, reason} if the stream terminated with an error.

on(sr, event_type, callback)

@spec on(t(), event_type(), (map() -> any()) | (map(), Omni.Response.t() -> any())) ::
  t()

Registers a side-effect handler for events of the given type.

Returns a new StreamingResponse with the handler inserted into the pipeline. Handlers fire during consumption (when complete/1 or any Enum function drives the stream) and their return values are discarded.

Multiple handlers can be chained — all fire independently:

{:ok, response} =
  stream
  |> StreamingResponse.on(:text_delta, fn %{delta: d} -> IO.write(d) end)
  |> StreamingResponse.on(:thinking_delta, fn %{delta: d} -> IO.write(d) end)
  |> StreamingResponse.on(:done, fn %{stop_reason: r} -> IO.puts("\nStop: #{r}") end)
  |> StreamingResponse.complete()

Use an arity-2 callback to access the partial response (e.g. for progress tracking based on accumulated token usage):

StreamingResponse.on(stream, :text_delta, fn _event, partial ->
  send(self(), {:tokens, partial.usage.output_tokens})
end)

text_stream(sr)

@spec text_stream(t()) :: Enumerable.t()

Returns a stream of text delta binaries.

Filters the event stream to only :text_delta events and extracts the delta string from each. Useful when you only need the text content:

stream
|> StreamingResponse.text_stream()
|> Enum.into("")