ALLM.StreamAdapter behaviour (allm v0.3.0)

Copy Markdown View Source

Streaming provider adapter contract. See spec §7.2.

stream/2 returns an Enumerable.t() of ALLM.Event values. The enumerable is lazy — no HTTP call fires until the caller starts reducing over it — and must be resource-safe: if the consumer halts early (Stream.take/2), the underlying HTTP request must be cancelled.

HTTP transport guidance

Use Finch directly with HTTP/1. Req's SSE path does not cover every provider's chunking quirks, and HTTP/2 flow control breaks for request bodies larger than 64 KB (the same issue documented in req_llm). Engines may inject a custom Finch name via adapter_opts: [finch_name: MyApp.Finch].

Implementations should use Stream.resource/3 (not Stream.unfold/2) — resource/3 has an explicit after_fun which is the canonical place to cancel the Finch ref.

Invariants

  1. The synchronous {:error, _} branch returns %AdapterError{} for pre-flight failures (missing key, invalid request shape, immediate HTTP error like 401 before the first event).
  2. The stream itself may terminate with either {:error, %AdapterError{}} (HTTP-shaped failure mid-response — the provider returned a 4xx/5xx after streaming started) or {:error, %ALLM.Error.StreamError{}} (transport-shaped failure — stream cancelled, timed out, malformed event). Both variants are emitted via the ALLM.Event {:error, _} tag per spec §8.
  3. The stream must be halt-safe: a consumer halt within 500 ms must cancel the Finch ref.
  4. opts[:stream_timeout] (time between consecutive events) is honored by the adapter; exceeding it emits a terminating {:error, %AdapterError{reason: :timeout}} event.
  5. Adapters emitting {:raw_chunk, {:usage, _}} events must pre-map provider-wire usage keys to %ALLM.Usage{} field names before emitting; see ALLM.StreamCollector's usage-fold contract.

Summary

Callbacks

Open a streaming request against the provider.

Callbacks

stream(t, keyword)

@callback stream(
  ALLM.Request.t(),
  keyword()
) :: {:ok, Enumerable.t()} | {:error, ALLM.Error.AdapterError.t()}

Open a streaming request against the provider.

Returns {:ok, enumerable} on success (the enumerable is lazy — no HTTP call has fired yet) or {:error, %ALLM.Error.AdapterError{}} on pre-flight failure.

Synchronous error reasons (same as ALLM.Adapter.generate/2)

ReasonHTTP statusFires when
:authentication_failed401API key missing or invalid.
:rate_limited429Provider quota exceeded; :retry_after_ms populated when Retry-After header is present.
:invalid_request400Request shape rejected by provider.
:content_filter400 (provider-specific)Provider's content filter rejected the prompt.
:context_length_exceeded400Request exceeded the model's context window.
:provider_unavailable500, 502, 503, 504, 529Provider server-side failure, retryable.
:timeoutPre-flight request exceeded opts[:request_timeout].
:network_errorTCP/TLS/DNS failure before the first event.
:malformed_responseProvider returned a non-SSE response body to the streaming endpoint.
:unsupported_featureRequest combined features the adapter cannot express.
:unknownanyCatch-all for shapes the adapter cannot classify.

Mid-stream {:error, _} event reasons

The enumerable may emit a terminating {:error, _} event carrying either an %AdapterError{} (HTTP-shaped) or a %StreamError{} (transport-shaped):

Struct typeReasonFires when
AdapterError:rate_limitedProvider returned 429 after SSE began.
AdapterError:provider_unavailableProvider returned 5xx after SSE began.
AdapterError:content_filterProvider interrupted the stream with a content-filter signal.
AdapterError:timeoutopts[:stream_timeout] elapsed between events.
StreamError:cancelledConsumer halted the stream early.
StreamError:timeoutTransport-level timeout between chunks (distinct from adapter-level request timeout).
StreamError:malformed_eventAn SSE line could not be parsed.
StreamError:adapter_errorWraps an underlying %AdapterError{} (see :cause field).
StreamError:unknownCatch-all for transport failures the adapter cannot classify.