Claudio.Messages.Stream (Claudio v0.5.0)
View SourceUtilities for parsing and consuming Server-Sent Events (SSE) from streaming Messages API responses.
Streaming usage telemetry is emitted via [:claudio, :messages, :stream, :usage]
when parse_events/1 reaches the terminal message_stop event and final usage
is available from message_delta frames.
Event Types
The Messages API streaming responses include the following event types:
message_start- Initial message with empty contentcontent_block_start- Beginning of a content blockcontent_block_delta- Incremental content updates (text, JSON, thinking)content_block_stop- End of a content blockmessage_delta- Top-level message changes (usage updates)message_stop- Stream completionping- Keep-alive eventserror- Error events
Example
response = Claudio.Messages.create_message(client, request)
response
|> Claudio.Messages.Stream.parse_events()
|> Stream.filter(&match?({:ok, %{event: "content_block_delta"}}, &1))
|> Enum.each(fn {:ok, event} ->
IO.puts(event.data["delta"]["text"])
end)
Summary
Functions
Accumulates text deltas from streaming events into complete text chunks.
Accumulates all events and returns the final complete message.
Filters stream to only specific event types.
Parses Server-Sent Events from a streaming response body.
Types
Functions
@spec accumulate_text(Enumerable.t()) :: Enumerable.t()
Accumulates text deltas from streaming events into complete text chunks.
Example
response
|> Stream.parse_events()
|> Stream.accumulate_text()
|> Enum.each(&IO.puts/1)
@spec build_final_message(Enumerable.t()) :: {:ok, map()} | {:error, term()}
Accumulates all events and returns the final complete message.
Example
{:ok, final_message} =
response
|> Stream.parse_events()
|> Stream.build_final_message()
@spec filter_events(Enumerable.t(), [String.t()]) :: Enumerable.t()
Filters stream to only specific event types.
Example
response
|> Stream.parse_events()
|> Stream.filter_events(["content_block_delta", "message_stop"])
@spec parse_events(Enumerable.t()) :: Enumerable.t()
Parses Server-Sent Events from a streaming response body.
Returns a Stream of {:ok, event} or {:error, reason} tuples.
Example
response
|> Stream.parse_events()
|> Enum.to_list()