ClaudeCode.Stream (ClaudeCode v0.16.0)

View Source

Stream utilities for handling Claude Code responses.

This module provides functions to create and manipulate streams of messages from Claude Code sessions. It enables real-time processing of Claude's responses without waiting for the complete result.

Example

session
|> ClaudeCode.stream("Write a story")
|> ClaudeCode.Stream.text_content()
|> Stream.each(&IO.write/1)
|> Stream.run()

Summary

Functions

Consumes the stream and returns a structured summary of the conversation.

Extracts all content deltas from a partial message stream.

Creates a stream of messages from a Claude Code query.

Filters stream to only stream events of a specific event type.

Filters a message stream by message type.

Returns only the final result text, consuming the stream.

Invokes a callback whenever a tool is used, without filtering the stream.

Applies a side-effect function to each message without filtering.

Extracts text content from a message stream.

Extracts text deltas from a partial message stream.

Extracts thinking content from a message stream.

Extracts thinking deltas from a partial message stream.

Extracts tool results for a specific tool name from a message stream.

Extracts tool use blocks from a message stream.

Takes messages until a result is received.

Functions

collect(stream)

@spec collect(Enumerable.t()) :: %{
  text: String.t(),
  tool_calls: [
    {ClaudeCode.Content.ToolUseBlock.t(),
     ClaudeCode.Content.ToolResultBlock.t() | nil}
  ],
  thinking: String.t(),
  result: String.t() | nil,
  is_error: boolean()
}

Consumes the stream and returns a structured summary of the conversation.

Returns a map containing:

  • text - All text content concatenated
  • tool_calls - List of {tool_use, tool_result} tuples pairing each tool invocation with its result. If a tool use has no matching result, the result will be nil.
  • thinking - All thinking content concatenated
  • result - The final result text
  • is_error - Whether the result was an error

Examples

summary = session
|> ClaudeCode.stream("Create a hello.txt file")
|> ClaudeCode.Stream.collect()

IO.puts("Claude said: #{summary.text}")
IO.puts("Tool calls: #{length(summary.tool_calls)}")
IO.puts("Final result: #{summary.result}")

# Process each tool call with its result
Enum.each(summary.tool_calls, fn {tool_use, tool_result} ->
  IO.puts("Tool: #{tool_use.name}")
  if tool_result, do: IO.puts("Result: #{tool_result.content}")
end)

content_deltas(stream)

@spec content_deltas(Enumerable.t()) :: Enumerable.t()

Extracts all content deltas from a partial message stream.

Returns a stream of delta maps, useful for tracking both text and tool use input as it arrives. Each element contains:

  • type: :text_delta, :input_json_delta, or :thinking_delta
  • index: The content block index
  • Content-specific fields (text, partial_json, or thinking)

Examples

ClaudeCode.stream(session, "Create a file", include_partial_messages: true)
|> ClaudeCode.Stream.content_deltas()
|> Enum.each(fn delta ->
  case delta.type do
    :text_delta -> IO.write(delta.text)
    :input_json_delta -> handle_tool_json(delta.partial_json)
    _ -> :ok
  end
end)

create(session, prompt, opts \\ [])

@spec create(pid(), String.t(), keyword()) :: Enumerable.t()

Creates a stream of messages from a Claude Code query.

This is the primary function for creating message streams. It returns a Stream that emits messages as they arrive from the CLI.

Options

  • :timeout - Maximum time to wait for each message (default: 60_000ms)
  • :filter - Message type filter (:all, :assistant, :tool_use, :result)

Examples

# Stream all messages
ClaudeCode.Stream.create(session, "Hello")
|> Enum.each(&IO.inspect/1)

# Stream only assistant messages
ClaudeCode.Stream.create(session, "Hello", filter: :assistant)
|> Enum.map(& &1.message.content)

filter_event_type(stream, event_type)

Filters stream to only stream events of a specific event type.

Valid event types: :message_start, :content_block_start, :content_block_delta, :content_block_stop, :message_delta, :message_stop

Examples

# Only content block deltas
stream
|> ClaudeCode.Stream.filter_event_type(:content_block_delta)
|> Enum.each(&process_delta/1)

filter_type(stream, type)

@spec filter_type(Enumerable.t(), atom()) :: Enumerable.t()

Filters a message stream by message type.

Examples

# Only assistant messages
stream |> ClaudeCode.Stream.filter_type(:assistant)

# Only result messages
stream |> ClaudeCode.Stream.filter_type(:result)

final_text(stream)

@spec final_text(Enumerable.t()) :: String.t() | nil

Returns only the final result text, consuming the stream.

This is the most common use case - when you just want Claude's answer without processing intermediate messages.

Examples

# Simple query
result = session
|> ClaudeCode.stream("What is 2 + 2?")
|> ClaudeCode.Stream.final_text()
# => "2 + 2 equals 4."

# With error handling
case ClaudeCode.Stream.final_text(stream) do
  nil -> IO.puts("No result received")
  text -> IO.puts(text)
end

on_tool_use(stream, callback)

@spec on_tool_use(Enumerable.t(), (ClaudeCode.Content.ToolUseBlock.t() -> any())) ::
  Enumerable.t()

Invokes a callback whenever a tool is used, without filtering the stream.

This is useful for progress indicators, logging, or triggering side effects when Claude uses tools. The callback receives each ToolUseBlock.

Examples

# Progress indicator for tool usage
stream
|> ClaudeCode.Stream.on_tool_use(fn tool ->
  IO.puts("Using tool: #{tool.name}")
end)
|> ClaudeCode.Stream.final_text()

# Send tool events to a LiveView process
stream
|> ClaudeCode.Stream.on_tool_use(fn tool ->
  send(liveview_pid, {:tool_started, tool.name, tool.input})
end)
|> Enum.to_list()

# Track tool usage
stream
|> ClaudeCode.Stream.on_tool_use(&Agent.update(tracker, fn tools -> [&1 | tools] end))
|> ClaudeCode.Stream.collect()

tap(stream, fun)

@spec tap(Enumerable.t(), (ClaudeCode.Message.t() -> any())) :: Enumerable.t()

Applies a side-effect function to each message without filtering.

This is useful for logging, monitoring, or sending events while still allowing the stream to continue unchanged. Unlike Stream.each/2, this is designed for observation within a pipeline.

Examples

# Logging all messages
stream
|> ClaudeCode.Stream.tap(fn msg -> Logger.debug("Got: #{inspect(msg)}") end)
|> ClaudeCode.Stream.text_content()
|> Enum.join()

# Progress notifications
stream
|> ClaudeCode.Stream.tap(&send(progress_pid, {:message, &1}))
|> ClaudeCode.Stream.final_text()

text_content(stream)

@spec text_content(Enumerable.t()) :: Enumerable.t()

Extracts text content from a message stream.

Filters the stream to only emit text content from assistant messages, making it easy to collect the textual response.

Examples

text = session
|> ClaudeCode.stream("Tell me about Elixir")
|> ClaudeCode.Stream.text_content()
|> Enum.join()

text_deltas(stream)

@spec text_deltas(Enumerable.t()) :: Enumerable.t()

Extracts text deltas from a partial message stream.

This enables character-by-character streaming from Claude's responses. Use with include_partial_messages: true option.

Examples

# Real-time character streaming for LiveView
ClaudeCode.stream(session, "Tell a story", include_partial_messages: true)
|> ClaudeCode.Stream.text_deltas()
|> Enum.each(fn chunk ->
  Phoenix.PubSub.broadcast(MyApp.PubSub, "chat:123", {:text_chunk, chunk})
end)

# Simple console output
session
|> ClaudeCode.stream("Hello", include_partial_messages: true)
|> ClaudeCode.Stream.text_deltas()
|> Enum.each(&IO.write/1)

thinking_content(stream)

@spec thinking_content(Enumerable.t()) :: Enumerable.t()

Extracts thinking content from a message stream.

Filters the stream to only emit thinking content from assistant messages, making it easy to collect Claude's extended reasoning.

Examples

thinking = session
|> ClaudeCode.stream("Complex problem")
|> ClaudeCode.Stream.thinking_content()
|> Enum.join()

thinking_deltas(stream)

@spec thinking_deltas(Enumerable.t()) :: Enumerable.t()

Extracts thinking deltas from a partial message stream.

This enables streaming of Claude's extended reasoning as it arrives. Use with include_partial_messages: true option.

Examples

# Stream thinking content in real-time
session
|> ClaudeCode.stream("Complex problem", include_partial_messages: true)
|> ClaudeCode.Stream.thinking_deltas()
|> Enum.each(&IO.write/1)

tool_results_by_name(stream, tool_name)

@spec tool_results_by_name(Enumerable.t(), String.t()) :: Enumerable.t()

Extracts tool results for a specific tool name from a message stream.

Since tool results reference tool uses by ID (not name), this function tracks tool use blocks and matches their IDs with subsequent tool results.

Examples

# Get all Read tool results
session
|> ClaudeCode.stream("Read some files")
|> ClaudeCode.Stream.tool_results_by_name("Read")
|> Enum.each(&IO.inspect/1)

# Get Bash command outputs
session
|> ClaudeCode.stream("Run some commands")
|> ClaudeCode.Stream.tool_results_by_name("Bash")
|> Enum.map(& &1.content)

tool_uses(stream)

@spec tool_uses(Enumerable.t()) :: Enumerable.t()

Extracts tool use blocks from a message stream.

Filters the stream to only emit tool use content blocks, making it easy to react to tool usage in real-time.

Examples

session
|> ClaudeCode.stream("Create some files")
|> ClaudeCode.Stream.tool_uses()
|> Enum.each(&handle_tool_use/1)

until_result(stream)

@spec until_result(Enumerable.t()) :: Enumerable.t()

Takes messages until a result is received.

This is useful when you want to process messages but stop as soon as the final result arrives.

Examples

messages = session
|> ClaudeCode.stream("Quick task")
|> ClaudeCode.Stream.until_result()
|> Enum.to_list()