AI.Provider.Utils.EventSource (AI SDK v0.0.1-rc.0)
View SourceUtility functions for handling Server-Sent Events (SSE).
This module provides functionality for making HTTP requests that receive Server-Sent Events (SSE) responses, commonly used for streaming API responses from AI providers like OpenAI.
The EventSource module handles:
- Creating HTTP connections with proper SSE headers
- Parsing SSE format (data:, event:, id:, retry: fields)
- Converting SSE events into structured Elixir stream events
- Supporting backpressure for efficient streaming
- Error handling and connection cleanup
Summary
Functions
Makes a POST request to the specified URL with streaming response.
Types
@type response() :: {:ok, %{status: integer(), body: binary(), stream: Enumerable.t()}} | {:error, term()}
Response from an SSE request
{:ok, response}- Successful response with status, body, and stream{:error, reason}- Error occurred during connection or streaming
@type stream_event() :: {:text_delta, String.t()} | {:finish, String.t()} | {:metadata, map()} | {:error, term()}
Stream events that can be emitted by an SSE stream
{:text_delta, text}- A chunk of text from the model{:finish, reason}- Stream has completed with reason (e.g., "stop", "length"){:metadata, data}- Additional metadata from the model{:error, reason}- An error occurred during streaming
Functions
Makes a POST request to the specified URL with streaming response.
This function sends a POST request with the provided body and headers to the URL,
and returns a structure that can be used to stream the SSE responses. The returned
stream emits events of type stream_event().
Parameters
url- The URL to make the request tobody- The JSON request bodyheaders- HTTP headers to include in the requestoptions- Additional options for the request::timeout- Connection timeout in milliseconds (default: 30000):max_line_length- Maximum length of an SSE line (default: 16384):retry_interval- Time to wait before reconnecting in ms (default: 3000)
Returns
{:ok, response}- Where response contains:status- HTTP status codebody- Response body (usually just initialization message)stream- Stream ofstream_event()events
{:error, reason}- If the request fails
Examples
{:ok, response} = EventSource.post(
"https://api.openai.com/v1/chat/completions",
%{model: "gpt-3.5-turbo", messages: [%{role: "user", content: "Hello"}], stream: true},
%{"Authorization" => "Bearer my-api-key", "Content-Type" => "application/json"},
%{timeout: 60_000}
)
response.stream
|> Stream.each(fn
{:text_delta, chunk} -> IO.write(chunk)
{:finish, reason} -> IO.puts("\nFinished: #{reason}")
{:error, error} -> IO.puts("Error: #{inspect(error)}")
_ -> :ok
end)
|> Stream.run()