ClaudeAgentSDK.Streaming.EventAdapter (claude_agent_sdk v0.15.0)

Copy Markdown View Source

Utilities for working with heterogeneous streaming event/message streams.

When using streaming with control features (hooks, SDK MCP, permissions), the stream may contain both streaming events (from EventParser) and Message structs (from control protocol). These helpers normalize and filter such mixed streams.

Examples

# Normalize mixed stream to consistent event maps
stream
|> EventAdapter.to_events()
|> Stream.each(fn %{type: type} ->
  # All items now have :type key
end)

# Extract only text content
final_text = stream
  |> EventAdapter.text_only()
  |> EventAdapter.accumulate_text()
  |> Enum.to_list()
  |> List.last()

# Filter to tool events only
stream
|> EventAdapter.tools_only()
|> Stream.each(fn tool_event ->
  # Handle tool calls
end)

Event vs Message

  • Events: Maps with :type key (from EventParser)

    • %{type: :text_delta, text: "..."}
    • %{type: :tool_use_start, name: "bash"}
  • Messages: Structs (from Message module)

    • %Message{type: :assistant, data: %{...}}
    • %Message{type: :result, subtype: :success}

This module normalizes both into a consistent event map format.

Summary

Functions

Accumulates text from stream into progressively built strings.

Filters stream to only text-related events.

Normalizes a mixed stream to consistent event maps.

Filters stream to only tool-related events.

Functions

accumulate_text(stream)

@spec accumulate_text(Enumerable.t()) :: Enumerable.t()

Accumulates text from stream into progressively built strings.

Takes text_delta events and builds up the complete text incrementally. Each element in the output stream contains the accumulated text up to that point.

Parameters

  • stream - Stream of events

Returns

Stream of accumulated text strings (one per event, building up)

Examples

# Get final complete text
final_text = stream
  |> EventAdapter.accumulate_text()
  |> Enum.to_list()
  |> List.last()

# Watch text build up in real-time
stream
|> EventAdapter.accumulate_text()
|> Stream.each(fn accumulated ->
  IO.write("\r#{accumulated}")
end)
|> Stream.run()

text_only(stream)

@spec text_only(Enumerable.t()) :: Enumerable.t()

Filters stream to only text-related events.

Includes:

  • :text_delta - Text content chunks
  • :text_block_start - Text block initialization

Parameters

  • stream - Stream of events

Returns

Stream containing only text events

Examples

stream
|> EventAdapter.text_only()
|> Stream.each(fn %{type: :text_delta, text: text} ->
  IO.write(text)
end)
|> Stream.run()

to_events(stream)

@spec to_events(Enumerable.t()) :: Enumerable.t()

Normalizes a mixed stream to consistent event maps.

Converts Message structs to event maps while passing through existing event maps unchanged. This creates a uniform stream interface.

Parameters

  • stream - Stream containing events and/or Message structs

Returns

Stream of event maps with :type key

Examples

# Events pass through unchanged
[%{type: :text_delta, text: "Hi"}]
|> Stream.into([])
|> EventAdapter.to_events()
|> Enum.to_list()
# => [%{type: :text_delta, text: "Hi"}]

# Messages get normalized
[%Message{type: :assistant, data: %{...}}]
|> Stream.into([])
|> EventAdapter.to_events()
|> Enum.to_list()
# => [%{type: :message, data: %{...}, timestamp: ~U[...]}]

tools_only(stream)

@spec tools_only(Enumerable.t()) :: Enumerable.t()

Filters stream to only tool-related events.

Includes:

  • :tool_use_start - Tool call initialization
  • :tool_input_delta - Tool input JSON chunks

Parameters

  • stream - Stream of events

Returns

Stream containing only tool events

Examples

stream
|> EventAdapter.tools_only()
|> Stream.each(fn
  %{type: :tool_use_start, name: name} ->
    IO.puts("Tool: #{name}")
  %{type: :tool_input_delta, json: json} ->
    IO.write(json)
end)
|> Stream.run()