Stateless line-buffered Server-Sent Events (SSE) decoder.
Reused by all SSE-streaming providers (currently ALLM.Providers.OpenAI;
Phase 11's ALLM.Providers.Anthropic consumes it unchanged). Provider-specific
interpretation of data: payloads happens in each adapter's chunk-to-event
mapper; this module only parses the SSE wire format per the
WHATWG Server-Sent Events spec.
Carve-out: the :done sentinel is OpenAI-specific
OpenAI's Chat Completions streaming terminates with a data: [DONE]\n\n
marker that has no native SSE meaning. To keep adapter chunk-mappers simple,
this decoder returns :done as an in-band sentinel inside the message list
whenever it parses a data: [DONE] event. Anthropic (Phase 11) signals
termination with a regular event: message_stop SSE event — Anthropic-side
reviewers must NOT blanket-pattern-match on :done and should instead
consume it as one possible element of the message list.
Accumulator round-trip
The accumulator is a plain map (no PIDs / refs / funs) and round-trips
cleanly through :erlang.term_to_binary/1. This matters for
Stream.resource/3 callers that thread the accumulator through
start_fun → next_fun state — see spec §7.2 and Invariant 8 in the
Phase 10 design doc's SSE Behaviour & Type Contracts.
Spec sections
Spec §7.2 (HTTP/1 streaming guidance).
Summary
Types
Parser accumulator: pending byte buffer + in-progress message fields.
Sentinel emitted in-band for OpenAI's [DONE] terminator (provider-specific carve-out).
Parsed SSE message per https://html.spec.whatwg.org/multipage/server-sent-events.html.
Functions
Decodes one chunk of SSE bytes against the running accumulator.
Returns an empty accumulator. Pure; no IO.
Types
@type accumulator() :: %{ buffer: binary(), partial: %{ event: String.t() | nil, data: [String.t()], id: String.t() | nil, retry: pos_integer() | nil } }
Parser accumulator: pending byte buffer + in-progress message fields.
@type done_marker() :: :done
Sentinel emitted in-band for OpenAI's [DONE] terminator (provider-specific carve-out).
@type message() :: %{ event: String.t() | nil, data: String.t(), id: String.t() | nil, retry: pos_integer() | nil }
Parsed SSE message per https://html.spec.whatwg.org/multipage/server-sent-events.html.
Functions
@spec decode_chunk(accumulator(), binary()) :: {[message() | done_marker()], accumulator()}
Decodes one chunk of SSE bytes against the running accumulator.
Total over binary() × accumulator() — never raises on malformed input.
Malformed lines (no : separator) are silently dropped per the SSE spec's
"ignore unrecognized fields" rule. Comment lines (starting with :) are
also dropped.
Returns {messages, new_accumulator} where messages is a list of parsed
message/0 values plus an in-band :done sentinel for OpenAI's
data: [DONE] terminator (see module doc).
Honors all three line terminators from the SSE spec: LF, CR, CRLF.
Examples
iex> {messages, _acc} =
...> ALLM.Providers.Support.SSE.new()
...> |> ALLM.Providers.Support.SSE.decode_chunk("data: hello\n\n")
iex> messages
[%{event: nil, data: "hello", id: nil, retry: nil}]
@spec new() :: accumulator()
Returns an empty accumulator. Pure; no IO.
Examples
iex> ALLM.Providers.Support.SSE.new()
%{buffer: "", partial: %{event: nil, data: [], id: nil, retry: nil}}