Utility functions for working with streaming inference responses.
Example: Collect a full response from a stream
{:ok, stream} = HuggingfaceClient.chat_completion_stream(client, %{
model: "meta-llama/Llama-3.1-8B-Instruct",
messages: [%{role: "user", content: "Write a haiku about Elixir"}]
})
text = HuggingfaceClient.Inference.StreamHelpers.collect_content(stream)
IO.puts(text)Example: Stream to Phoenix LiveView
{:ok, stream} = HuggingfaceClient.chat_completion_stream(client, args)
HuggingfaceClient.Inference.StreamHelpers.each_content(stream, fn chunk ->
send(self(), {:llm_chunk, chunk})
end)
Summary
Functions
Wraps a stream in a Task that accumulates the full response, resolving to
{:ok, full_text} or {:error, exception}.
Collects all content delta tokens from a chat completion stream into a single string.
Collects the full chat completion output including metadata from a stream.
Converts a chat completion stream into a stream of plain content strings.
Calls callback for each non-empty content delta token in the stream.
Runs a stream in a background Task and sends each content token to pid
as {tag, token} messages.
Functions
@spec async_collect(Enumerable.t(), timeout()) :: Task.t()
Wraps a stream in a Task that accumulates the full response, resolving to
{:ok, full_text} or {:error, exception}.
Example
task = HuggingfaceClient.Inference.StreamHelpers.async_collect(stream)
# Do other work here…
case Task.await(task, 30_000) do
{:ok, text} -> IO.puts(text)
{:error, err} -> IO.inspect(err)
end
@spec collect_content(Enumerable.t()) :: String.t()
Collects all content delta tokens from a chat completion stream into a single string.
Blocks until the stream is exhausted. For non-blocking use, run in a Task.
@spec collect_response(Enumerable.t()) :: map()
Collects the full chat completion output including metadata from a stream.
Returns a map mimicking the non-streaming response shape:
%{"choices" => [%{"message" => %{"role" => "assistant", "content" => text}}], ...}
Useful when you want to stream visually but still get a complete response object.
@spec content_stream(Enumerable.t()) :: Enumerable.t()
Converts a chat completion stream into a stream of plain content strings.
Filters out empty/nil deltas automatically.
Example
{:ok, raw_stream} = HuggingfaceClient.chat_completion_stream(client, args)
text_stream = HuggingfaceClient.Inference.StreamHelpers.content_stream(raw_stream)
Enum.each(text_stream, &IO.write/1)
@spec each_content(Enumerable.t(), (String.t() -> any())) :: :ok
Calls callback for each non-empty content delta token in the stream.
Returns :ok when the stream is exhausted.
@spec stream_to_pid(Enumerable.t(), pid(), term()) :: Task.t()
Runs a stream in a background Task and sends each content token to pid
as {tag, token} messages.
Sends {tag, :done} when the stream is exhausted, or {tag, {:error, reason}}
if the stream raises.
Example (Phoenix LiveView)
def handle_event("ask", %{"prompt" => prompt}, socket) do
{:ok, stream} = HuggingfaceClient.chat_completion_stream(@client, %{
model: "meta-llama/Llama-3.1-8B-Instruct",
messages: [%{role: "user", content: prompt}]
})
HuggingfaceClient.Inference.StreamHelpers.stream_to_pid(stream, self(), :llm)
{:noreply, socket}
end
def handle_info({:llm, token}, socket) when is_binary(token) do
{:noreply, update(socket, :response, &(&1 <> token))}
end
def handle_info({:llm, :done}, socket), do: {:noreply, socket}