ClaudeCode.Stream (ClaudeCode v0.16.0)
View SourceStream utilities for handling Claude Code responses.
This module provides functions to create and manipulate streams of messages from Claude Code sessions. It enables real-time processing of Claude's responses without waiting for the complete result.
Example
session
|> ClaudeCode.stream("Write a story")
|> ClaudeCode.Stream.text_content()
|> Stream.each(&IO.write/1)
|> Stream.run()
Summary
Functions
Consumes the stream and returns a structured summary of the conversation.
Extracts all content deltas from a partial message stream.
Creates a stream of messages from a Claude Code query.
Filters stream to only stream events of a specific event type.
Filters a message stream by message type.
Returns only the final result text, consuming the stream.
Invokes a callback whenever a tool is used, without filtering the stream.
Applies a side-effect function to each message without filtering.
Extracts text content from a message stream.
Extracts text deltas from a partial message stream.
Extracts thinking content from a message stream.
Extracts thinking deltas from a partial message stream.
Extracts tool results for a specific tool name from a message stream.
Extracts tool use blocks from a message stream.
Takes messages until a result is received.
Functions
@spec collect(Enumerable.t()) :: %{ text: String.t(), tool_calls: [ {ClaudeCode.Content.ToolUseBlock.t(), ClaudeCode.Content.ToolResultBlock.t() | nil} ], thinking: String.t(), result: String.t() | nil, is_error: boolean() }
Consumes the stream and returns a structured summary of the conversation.
Returns a map containing:
text- All text content concatenatedtool_calls- List of{tool_use, tool_result}tuples pairing each tool invocation with its result. If a tool use has no matching result, the result will benil.thinking- All thinking content concatenatedresult- The final result textis_error- Whether the result was an error
Examples
summary = session
|> ClaudeCode.stream("Create a hello.txt file")
|> ClaudeCode.Stream.collect()
IO.puts("Claude said: #{summary.text}")
IO.puts("Tool calls: #{length(summary.tool_calls)}")
IO.puts("Final result: #{summary.result}")
# Process each tool call with its result
Enum.each(summary.tool_calls, fn {tool_use, tool_result} ->
IO.puts("Tool: #{tool_use.name}")
if tool_result, do: IO.puts("Result: #{tool_result.content}")
end)
@spec content_deltas(Enumerable.t()) :: Enumerable.t()
Extracts all content deltas from a partial message stream.
Returns a stream of delta maps, useful for tracking both text and tool use input as it arrives. Each element contains:
type::text_delta,:input_json_delta, or:thinking_deltaindex: The content block index- Content-specific fields (
text,partial_json, orthinking)
Examples
ClaudeCode.stream(session, "Create a file", include_partial_messages: true)
|> ClaudeCode.Stream.content_deltas()
|> Enum.each(fn delta ->
case delta.type do
:text_delta -> IO.write(delta.text)
:input_json_delta -> handle_tool_json(delta.partial_json)
_ -> :ok
end
end)
@spec create(pid(), String.t(), keyword()) :: Enumerable.t()
Creates a stream of messages from a Claude Code query.
This is the primary function for creating message streams. It returns a Stream that emits messages as they arrive from the CLI.
Options
:timeout- Maximum time to wait for each message (default: 60_000ms):filter- Message type filter (:all, :assistant, :tool_use, :result)
Examples
# Stream all messages
ClaudeCode.Stream.create(session, "Hello")
|> Enum.each(&IO.inspect/1)
# Stream only assistant messages
ClaudeCode.Stream.create(session, "Hello", filter: :assistant)
|> Enum.map(& &1.message.content)
@spec filter_event_type( Enumerable.t(), ClaudeCode.Message.PartialAssistantMessage.event_type() ) :: Enumerable.t()
Filters stream to only stream events of a specific event type.
Valid event types: :message_start, :content_block_start,
:content_block_delta, :content_block_stop, :message_delta, :message_stop
Examples
# Only content block deltas
stream
|> ClaudeCode.Stream.filter_event_type(:content_block_delta)
|> Enum.each(&process_delta/1)
@spec filter_type(Enumerable.t(), atom()) :: Enumerable.t()
Filters a message stream by message type.
Examples
# Only assistant messages
stream |> ClaudeCode.Stream.filter_type(:assistant)
# Only result messages
stream |> ClaudeCode.Stream.filter_type(:result)
@spec final_text(Enumerable.t()) :: String.t() | nil
Returns only the final result text, consuming the stream.
This is the most common use case - when you just want Claude's answer without processing intermediate messages.
Examples
# Simple query
result = session
|> ClaudeCode.stream("What is 2 + 2?")
|> ClaudeCode.Stream.final_text()
# => "2 + 2 equals 4."
# With error handling
case ClaudeCode.Stream.final_text(stream) do
nil -> IO.puts("No result received")
text -> IO.puts(text)
end
@spec on_tool_use(Enumerable.t(), (ClaudeCode.Content.ToolUseBlock.t() -> any())) :: Enumerable.t()
Invokes a callback whenever a tool is used, without filtering the stream.
This is useful for progress indicators, logging, or triggering side effects
when Claude uses tools. The callback receives each ToolUseBlock.
Examples
# Progress indicator for tool usage
stream
|> ClaudeCode.Stream.on_tool_use(fn tool ->
IO.puts("Using tool: #{tool.name}")
end)
|> ClaudeCode.Stream.final_text()
# Send tool events to a LiveView process
stream
|> ClaudeCode.Stream.on_tool_use(fn tool ->
send(liveview_pid, {:tool_started, tool.name, tool.input})
end)
|> Enum.to_list()
# Track tool usage
stream
|> ClaudeCode.Stream.on_tool_use(&Agent.update(tracker, fn tools -> [&1 | tools] end))
|> ClaudeCode.Stream.collect()
@spec tap(Enumerable.t(), (ClaudeCode.Message.t() -> any())) :: Enumerable.t()
Applies a side-effect function to each message without filtering.
This is useful for logging, monitoring, or sending events while still
allowing the stream to continue unchanged. Unlike Stream.each/2, this
is designed for observation within a pipeline.
Examples
# Logging all messages
stream
|> ClaudeCode.Stream.tap(fn msg -> Logger.debug("Got: #{inspect(msg)}") end)
|> ClaudeCode.Stream.text_content()
|> Enum.join()
# Progress notifications
stream
|> ClaudeCode.Stream.tap(&send(progress_pid, {:message, &1}))
|> ClaudeCode.Stream.final_text()
@spec text_content(Enumerable.t()) :: Enumerable.t()
Extracts text content from a message stream.
Filters the stream to only emit text content from assistant messages, making it easy to collect the textual response.
Examples
text = session
|> ClaudeCode.stream("Tell me about Elixir")
|> ClaudeCode.Stream.text_content()
|> Enum.join()
@spec text_deltas(Enumerable.t()) :: Enumerable.t()
Extracts text deltas from a partial message stream.
This enables character-by-character streaming from Claude's responses.
Use with include_partial_messages: true option.
Examples
# Real-time character streaming for LiveView
ClaudeCode.stream(session, "Tell a story", include_partial_messages: true)
|> ClaudeCode.Stream.text_deltas()
|> Enum.each(fn chunk ->
Phoenix.PubSub.broadcast(MyApp.PubSub, "chat:123", {:text_chunk, chunk})
end)
# Simple console output
session
|> ClaudeCode.stream("Hello", include_partial_messages: true)
|> ClaudeCode.Stream.text_deltas()
|> Enum.each(&IO.write/1)
@spec thinking_content(Enumerable.t()) :: Enumerable.t()
Extracts thinking content from a message stream.
Filters the stream to only emit thinking content from assistant messages, making it easy to collect Claude's extended reasoning.
Examples
thinking = session
|> ClaudeCode.stream("Complex problem")
|> ClaudeCode.Stream.thinking_content()
|> Enum.join()
@spec thinking_deltas(Enumerable.t()) :: Enumerable.t()
Extracts thinking deltas from a partial message stream.
This enables streaming of Claude's extended reasoning as it arrives.
Use with include_partial_messages: true option.
Examples
# Stream thinking content in real-time
session
|> ClaudeCode.stream("Complex problem", include_partial_messages: true)
|> ClaudeCode.Stream.thinking_deltas()
|> Enum.each(&IO.write/1)
@spec tool_results_by_name(Enumerable.t(), String.t()) :: Enumerable.t()
Extracts tool results for a specific tool name from a message stream.
Since tool results reference tool uses by ID (not name), this function tracks tool use blocks and matches their IDs with subsequent tool results.
Examples
# Get all Read tool results
session
|> ClaudeCode.stream("Read some files")
|> ClaudeCode.Stream.tool_results_by_name("Read")
|> Enum.each(&IO.inspect/1)
# Get Bash command outputs
session
|> ClaudeCode.stream("Run some commands")
|> ClaudeCode.Stream.tool_results_by_name("Bash")
|> Enum.map(& &1.content)
@spec tool_uses(Enumerable.t()) :: Enumerable.t()
Extracts tool use blocks from a message stream.
Filters the stream to only emit tool use content blocks, making it easy to react to tool usage in real-time.
Examples
session
|> ClaudeCode.stream("Create some files")
|> ClaudeCode.Stream.tool_uses()
|> Enum.each(&handle_tool_use/1)
@spec until_result(Enumerable.t()) :: Enumerable.t()
Takes messages until a result is received.
This is useful when you want to process messages but stop as soon as the final result arrives.
Examples
messages = session
|> ClaudeCode.stream("Quick task")
|> ClaudeCode.Stream.until_result()
|> Enum.to_list()