ClaudeCodeSDK.Streaming.EventParser (claude_code_sdk v0.2.2)

View Source

Parses streaming events from Claude CLI's --include-partial-messages output.

Handles all Server-Sent Events (SSE) types from the Anthropic Messages API:

  • message_start - Message initialization with metadata
  • content_block_start - Content block (text/tool_use/thinking) starts
  • content_block_delta - Incremental content updates (THE CORE STREAMING EVENT)
    • text_delta - Character-by-character text streaming
    • input_json_delta - Tool input JSON being built
    • thinking_delta - Extended thinking content (Sonnet 4.5+)
  • content_block_stop - Content block complete
  • message_delta - Message-level metadata updates
  • message_stop - Message complete

References

Summary

Functions

Parses a buffer of newline-delimited JSON and returns parsed events.

Parses a single streaming event from Claude CLI output.

Types

accumulated_text()

@type accumulated_text() :: String.t()

event()

@type event() :: map()

Functions

parse_buffer(buffer, accumulated_text)

@spec parse_buffer(String.t(), accumulated_text()) ::
  {:ok, [event()], String.t(), accumulated_text()}

Parses a buffer of newline-delimited JSON and returns parsed events.

Handles partial JSON lines by returning the unparsed remainder.

Parameters

  • buffer - String buffer containing newline-delimited JSON
  • accumulated_text - Current accumulated text for the message

Returns

{:ok, events, remaining_buffer, new_accumulated_text}

Examples

buffer = ~s({"type":"message_start","message":{"model":"sonnet"}}\n{"type":"content_block_delta","delta":{"type":"text_delta","text":"Hi"}}\npartial)
{:ok, events, remaining, accumulated} = EventParser.parse_buffer(buffer, "")

# events will contain both parsed events
# remaining will be "partial" (incomplete JSON line)
# accumulated will be "Hi" (from text_delta)

parse_event(event, accumulated_text)

@spec parse_event(map(), accumulated_text()) :: {:ok, [event()], accumulated_text()}

Parses a single streaming event from Claude CLI output.

Returns {:ok, [events], new_accumulated_text} where:

  • events - List of parsed event maps (may be empty for unknown events)
  • new_accumulated_text - Updated accumulated text for current message

Parameters

  • raw_event - Decoded JSON map from CLI stdout
  • accumulated_text - Current accumulated text for this message

Event Types Returned

Text Streaming (primary use case)

  • %{type: :text_delta, text: "...", accumulated: "..."} - Character chunk + full text so far

Message Lifecycle

  • %{type: :message_start, model: "..."} - Message begins
  • %{type: :message_stop, final_text: "..."} - Message complete

Content Blocks

  • %{type: :text_block_start} - Text content block starts
  • %{type: :content_block_stop, final_text: "..."} - Block complete
  • %{type: :tool_use_start, name: "...", id: "..."} - Tool call starts
  • %{type: :thinking_start} - Thinking block starts (Sonnet 4.5+)

Tool & Thinking

  • %{type: :tool_input_delta, json: "..."} - Partial tool input JSON
  • %{type: :thinking_delta, thinking: "..."} - Thinking content chunk

Metadata

  • %{type: :message_delta, stop_reason: "...", stop_sequence: "..."} - Final metadata

Examples

# Text delta (most common event)
{:ok, [%{type: :text_delta, text: "Hello", accumulated: "Hello"}], "Hello"} =
  EventParser.parse_event(%{"type" => "content_block_delta", "delta" => %{"type" => "text_delta", "text" => "Hello"}}, "")

# Message start
{:ok, [%{type: :message_start, model: "claude-sonnet-4-5"}], ""} =
  EventParser.parse_event(%{"type" => "message_start", "message" => %{"model" => "claude-sonnet-4-5"}}, "")

# Unknown event (ignored gracefully)
{:ok, [], "existing text"} =
  EventParser.parse_event(%{"type" => "unknown_event"}, "existing text")