ALLM.Providers.Support.SSE (allm v0.3.0)

Copy Markdown View Source

Stateless line-buffered Server-Sent Events (SSE) decoder.

Reused by all SSE-streaming providers (currently ALLM.Providers.OpenAI; Phase 11's ALLM.Providers.Anthropic consumes it unchanged). Provider-specific interpretation of data: payloads happens in each adapter's chunk-to-event mapper; this module only parses the SSE wire format per the WHATWG Server-Sent Events spec.

Carve-out: the :done sentinel is OpenAI-specific

OpenAI's Chat Completions streaming terminates with a data: [DONE]\n\n marker that has no native SSE meaning. To keep adapter chunk-mappers simple, this decoder returns :done as an in-band sentinel inside the message list whenever it parses a data: [DONE] event. Anthropic (Phase 11) signals termination with a regular event: message_stop SSE event — Anthropic-side reviewers must NOT blanket-pattern-match on :done and should instead consume it as one possible element of the message list.

Accumulator round-trip

The accumulator is a plain map (no PIDs / refs / funs) and round-trips cleanly through :erlang.term_to_binary/1. This matters for Stream.resource/3 callers that thread the accumulator through start_fun → next_fun state — see spec §7.2 and Invariant 8 in the Phase 10 design doc's SSE Behaviour & Type Contracts.

Spec sections

Spec §7.2 (HTTP/1 streaming guidance).

Summary

Types

Parser accumulator: pending byte buffer + in-progress message fields.

Sentinel emitted in-band for OpenAI's [DONE] terminator (provider-specific carve-out).

Functions

Decodes one chunk of SSE bytes against the running accumulator.

Returns an empty accumulator. Pure; no IO.

Types

accumulator()

@type accumulator() :: %{
  buffer: binary(),
  partial: %{
    event: String.t() | nil,
    data: [String.t()],
    id: String.t() | nil,
    retry: pos_integer() | nil
  }
}

Parser accumulator: pending byte buffer + in-progress message fields.

done_marker()

@type done_marker() :: :done

Sentinel emitted in-band for OpenAI's [DONE] terminator (provider-specific carve-out).

message()

@type message() :: %{
  event: String.t() | nil,
  data: String.t(),
  id: String.t() | nil,
  retry: pos_integer() | nil
}

Parsed SSE message per https://html.spec.whatwg.org/multipage/server-sent-events.html.

Functions

decode_chunk(acc, chunk)

@spec decode_chunk(accumulator(), binary()) ::
  {[message() | done_marker()], accumulator()}

Decodes one chunk of SSE bytes against the running accumulator.

Total over binary() × accumulator() — never raises on malformed input. Malformed lines (no : separator) are silently dropped per the SSE spec's "ignore unrecognized fields" rule. Comment lines (starting with :) are also dropped.

Returns {messages, new_accumulator} where messages is a list of parsed message/0 values plus an in-band :done sentinel for OpenAI's data: [DONE] terminator (see module doc).

Honors all three line terminators from the SSE spec: LF, CR, CRLF.

Examples

iex> {messages, _acc} =
...>   ALLM.Providers.Support.SSE.new()
...>   |> ALLM.Providers.Support.SSE.decode_chunk("data: hello\n\n")
iex> messages
[%{event: nil, data: "hello", id: nil, retry: nil}]

new()

@spec new() :: accumulator()

Returns an empty accumulator. Pure; no IO.

Examples

iex> ALLM.Providers.Support.SSE.new()
%{buffer: "", partial: %{event: nil, data: [], id: nil, retry: nil}}