Utilities for working with heterogeneous streaming event/message streams.
When using streaming with control features (hooks, SDK MCP, permissions), the stream may contain both streaming events (from EventParser) and Message structs (from control protocol). These helpers normalize and filter such mixed streams.
Examples
# Normalize mixed stream to consistent event maps
stream
|> EventAdapter.to_events()
|> Stream.each(fn %{type: type} ->
# All items now have :type key
end)
# Extract only text content
final_text = stream
|> EventAdapter.text_only()
|> EventAdapter.accumulate_text()
|> Enum.to_list()
|> List.last()
# Filter to tool events only
stream
|> EventAdapter.tools_only()
|> Stream.each(fn tool_event ->
# Handle tool calls
end)Event vs Message
Events: Maps with
:typekey (from EventParser)%{type: :text_delta, text: "..."}%{type: :tool_use_start, name: "bash"}
Messages: Structs (from Message module)
%Message{type: :assistant, data: %{...}}%Message{type: :result, subtype: :success}
This module normalizes both into a consistent event map format.
Summary
Functions
Accumulates text from stream into progressively built strings.
Filters stream to only text-related events.
Normalizes a mixed stream to consistent event maps.
Filters stream to only tool-related events.
Functions
@spec accumulate_text(Enumerable.t()) :: Enumerable.t()
Accumulates text from stream into progressively built strings.
Takes text_delta events and builds up the complete text incrementally. Each element in the output stream contains the accumulated text up to that point.
Parameters
stream- Stream of events
Returns
Stream of accumulated text strings (one per event, building up)
Examples
# Get final complete text
final_text = stream
|> EventAdapter.accumulate_text()
|> Enum.to_list()
|> List.last()
# Watch text build up in real-time
stream
|> EventAdapter.accumulate_text()
|> Stream.each(fn accumulated ->
IO.write("\r#{accumulated}")
end)
|> Stream.run()
@spec text_only(Enumerable.t()) :: Enumerable.t()
Filters stream to only text-related events.
Includes:
:text_delta- Text content chunks:text_block_start- Text block initialization
Parameters
stream- Stream of events
Returns
Stream containing only text events
Examples
stream
|> EventAdapter.text_only()
|> Stream.each(fn %{type: :text_delta, text: text} ->
IO.write(text)
end)
|> Stream.run()
@spec to_events(Enumerable.t()) :: Enumerable.t()
Normalizes a mixed stream to consistent event maps.
Converts Message structs to event maps while passing through existing event maps unchanged. This creates a uniform stream interface.
Parameters
stream- Stream containing events and/or Message structs
Returns
Stream of event maps with :type key
Examples
# Events pass through unchanged
[%{type: :text_delta, text: "Hi"}]
|> Stream.into([])
|> EventAdapter.to_events()
|> Enum.to_list()
# => [%{type: :text_delta, text: "Hi"}]
# Messages get normalized
[%Message{type: :assistant, data: %{...}}]
|> Stream.into([])
|> EventAdapter.to_events()
|> Enum.to_list()
# => [%{type: :message, data: %{...}, timestamp: ~U[...]}]
@spec tools_only(Enumerable.t()) :: Enumerable.t()
Filters stream to only tool-related events.
Includes:
:tool_use_start- Tool call initialization:tool_input_delta- Tool input JSON chunks
Parameters
stream- Stream of events
Returns
Stream containing only tool events
Examples
stream
|> EventAdapter.tools_only()
|> Stream.each(fn
%{type: :tool_use_start, name: name} ->
IO.puts("Tool: #{name}")
%{type: :tool_input_delta, json: json} ->
IO.write(json)
end)
|> Stream.run()