Utilities for consuming streaming chat-completion responses.
All functions accept a stream of OpenAI-style chat.completion.chunk maps
(the decoded JSON objects emitted by HuggingfaceClient.SSE.parse_stream_json/1).
Summary
Functions
Runs collect_content/1 asynchronously in a Task.
Collects all content tokens from a chunk stream into a single string.
Assembles a complete "chat.completion" response map from a chunk stream.
Returns a lazy stream of non-empty content strings from a chunk stream.
Calls callback for each non-empty content token. Returns :ok.
Streams content tokens to a PID as {tag, token} messages, then {tag, :done}.
Functions
@spec async_collect(Enumerable.t()) :: Task.t()
Runs collect_content/1 asynchronously in a Task.
Returns a Task that resolves to {:ok, full_text} or {:error, exception}.
@spec collect_content(Enumerable.t()) :: String.t()
Collects all content tokens from a chunk stream into a single string.
Nil deltas and stop-reason chunks are silently ignored.
@spec collect_response(Enumerable.t()) :: map()
Assembles a complete "chat.completion" response map from a chunk stream.
Uses the last chunk's metadata (id, model, created) and concatenates all content tokens.
Returns a map with "object" => "chat.completion" even for an empty stream.
@spec content_stream(Enumerable.t()) :: Enumerable.t()
Returns a lazy stream of non-empty content strings from a chunk stream.
Stop-reason and nil-delta chunks produce no elements.
@spec each_content(Enumerable.t(), (String.t() -> any())) :: :ok
Calls callback for each non-empty content token. Returns :ok.
@spec stream_to_pid(Enumerable.t(), pid(), atom()) :: :ok
Streams content tokens to a PID as {tag, token} messages, then {tag, :done}.
On stream error sends {tag, {:error, exception}}. Defaults tag to :hf_stream.