OpenAI.Responses.Stream (OpenAI.Responses v0.4.2)

View Source

Streaming functionality for the Responses library.

This module provides functions for streaming responses from the OpenAI API, allowing you to process data as it arrives rather than waiting for the complete response.

Stream Processing Helpers

For processing streamed data, use these helpers:

Examples

# Extract text from streaming response
text = Responses.stream(input: "Write a story")
       |> Responses.Stream.text_deltas()
       |> Enum.join()

# Process JSON events directly from response stream
Responses.stream(input: "Generate data", schema: %{name: :string})
|> Responses.Stream.json_events()
|> Stream.each(&IO.inspect/1)
|> Stream.run()

Summary

Functions

Calls the provided function on all the text chunks received from the server.

Extracts JSON parsing events from a response stream.

Returns a Stream that yields chunks from the OpenAI API.

Stream a response from the OpenAI API with a callback function.

Extracts text deltas from a stream, ignoring errors and other event types.

Functions

delta(f)

Calls the provided function on all the text chunks received from the server.

This helper handles the wrapped results and only processes successful text delta events. Errors and other events are silently ignored.

Examples

Responses.create(
  input: "Write a story",
  stream: Responses.Stream.delta(&IO.write/1)
)

json_events(stream)

Extracts JSON parsing events from a response stream.

This helper automatically extracts text deltas from the event stream and converts them into JSON parsing events using incremental parsing.

Events

Returns a Stream that yields JSON parsing events:

  • :start_object - Beginning of a JSON object
  • :end_object - End of a JSON object
  • :start_array - Beginning of a JSON array
  • :end_array - End of a JSON array
  • {:string, value} - A string value
  • {:integer, value} - An integer value
  • {:float, value} - A float value
  • {:boolean, value} - A boolean value
  • :null - A null value
  • :colon - Colon between key and value
  • :comma - Comma between elements

Examples

# Process JSON events directly from API response
events = Responses.stream(
  input: "Give me 10 U.S. presidents",
  schema: %{presidents: {:array, %{name: :string, birth_year: :integer}}}
)
|> Responses.Stream.json_events()
|> Enum.into([])

# Process events as they arrive
Responses.stream(
  input: "Generate product catalog",
  schema: %{products: {:array, %{name: :string, price: :number}}}
)
|> Responses.Stream.json_events()
|> Stream.each(fn event ->
  IO.inspect(event, label: "JSON Event")
end)
|> Stream.run()

stream(options)

Returns a Stream that yields chunks from the OpenAI API.

This function returns an Enumerable that yields results wrapped in tuples: {:ok, chunk} for successful chunks or {:error, reason} for parsing errors.

Examples

# Process all results with error handling
Responses.stream(input: "Hello")
|> Enum.each(fn
  {:ok, chunk} -> IO.inspect(chunk)
  {:error, reason} -> IO.puts("Error: #{inspect(reason)}")
end)

# Get only text deltas, ignoring errors
text = Responses.stream(input: "Write a story")
       |> Stream.filter(fn
         {:ok, %{event: "response.output_text.delta"}} -> true
         _ -> false
       end)
       |> Stream.map(fn {:ok, chunk} -> chunk.data["delta"] end)
       |> Enum.join()

stream_with_callback(callback, options)

Stream a response from the OpenAI API with a callback function.

Takes a callback function that will be called for each parsed chunk of the stream. The callback receives results wrapped in tuples:

{:ok, %{event: "event_type", data: %{...}}}
# or
{:error, reason}

The callback should return :ok to continue streaming or {:error, reason} to stop.

Options

Accepts the same options as Responses.create/1, and automatically adds stream: true parameter.

Examples

# Simple debugging - print all results
Responses.Stream.stream_with_callback(&IO.inspect/1, input: "Hello")

# Process only text deltas using the delta/1 helper
Responses.Stream.stream_with_callback(
  Responses.Stream.delta(&IO.write/1),
  input: "Write a story"
)

# Custom processing with error handling
Responses.Stream.stream_with_callback(fn
  {:ok, %{event: "response.output_text.delta", data: %{"delta" => text}}} ->
    IO.write(text)
    :ok
  {:ok, %{event: "response.completed"}} ->
    IO.puts("

Stream complete!")

    :ok
  {:error, reason} ->
    IO.puts("

Stream error: #{inspect(reason)}")

    :ok  # Continue despite error
  _ ->
    :ok
end, input: "Tell me a joke")

text_deltas(stream)

Extracts text deltas from a stream, ignoring errors and other event types.

This helper transforms a raw event stream into a text-only stream by filtering for response.output_text.delta events and extracting their delta content. All errors and non-text events are silently ignored.

Examples

# Get text stream from API response
text = Responses.stream(input: "Write a story")
       |> Responses.Stream.text_deltas()
       |> Enum.join()

# Process text incrementally
Responses.stream(input: "Count to 10")
|> Responses.Stream.text_deltas()
|> Stream.each(&IO.write/1)
|> Stream.run()