This guide covers streaming in the Claude Agent SDK for Elixir, from simple queries to advanced real-time streaming with multi-turn conversations.
Table of Contents
- Overview
- Simple Query Streaming with query/2
- Streaming API
- Event Types
- Real-Time Typewriter Effect
- Multi-Turn Conversations
- Error Handling in Streams
- Best Practices
Overview
The Claude Agent SDK provides two streaming approaches:
Simple Streaming via
query/2- Returns a lazy Elixir stream of parsed messages as they arrive from the CLI.Bidirectional Streaming API - Provides persistent sessions with real-time character-by-character updates. Best for chat interfaces and interactive applications.
Runtime Split
- The common CLI streaming/session lane now runs on
cli_subprocess_corethroughClaudeAgentSDK.Runtime.CLI. ClaudeAgentSDK.Streaming.Sessionstays SDK-local as the public session process and preserves the existing stream/subscriber contract.- The advanced control client family still lives in
ClaudeAgentSDK.Clientfor hooks, permission callbacks, and SDK MCP features. - Both lanes share the same core-backed subprocess lane. Use
Options.execution_surfaceto route work over local or SSH execution surfaces.
Schema Ownership
Zoiis the canonical validation layer for new streaming ingress work.ClaudeAgentSDK.Schema.Messagevalidates raw message frames and streaming event families before they are projected into%ClaudeAgentSDK.Message{}or public streaming maps.raw_eventremains attached to parsed streaming events so forward-compatible fields survive even when the public event projection stays intentionally small.- the common CLI streaming parser keeps the stricter
stream_eventwrapper contract (uuidandsession_idrequired), while the SDK-local control lane may surface missing wrapper metadata asnil.
Key Differences
| Feature | query/2 | Streaming API |
|---|---|---|
| Real-time text | Message-level | Yes (character-level) |
| Multi-turn | Via resume/3 | Native session support |
| Event granularity | Message-level | Token-level |
| Resource usage | Lower | Session process |
| Use case | Scripts, batch | Chat UIs, interactive |
Simple Query Streaming with query/2
The ClaudeAgentSDK.query/2 function returns a lazy stream of Message structs. This is the simplest way to interact with Claude, and it yields messages as the CLI emits them.
Basic Usage
alias ClaudeAgentSDK.{Options, ContentExtractor}
# Simple query with default options
ClaudeAgentSDK.query("Write a haiku about Elixir")
|> Enum.each(fn message ->
case message.type do
:system ->
IO.puts("Session started: #{message.data.session_id}")
:assistant ->
text = ContentExtractor.extract_text(message)
if is_binary(text) and text != "", do: IO.puts("Claude: #{text}")
:result ->
IO.puts("Cost: $#{message.data.total_cost_usd}")
_ ->
:ok
end
end)With Options
options = %Options{
model: "haiku",
max_turns: 5,
system_prompt: "You are a helpful coding assistant.",
output_format: :stream_json
}
messages = ClaudeAgentSDK.query("Explain pattern matching", options)
|> Enum.to_list()
# Extract just the text responses
text = messages
|> Enum.filter(&(&1.type == :assistant))
|> Enum.map(&ContentExtractor.extract_text/1)
|> Enum.reject(&(&1 in [nil, ""]))
|> Enum.join("\n")
IO.puts(text)Using OptionBuilder Presets
alias ClaudeAgentSDK.OptionBuilder
# Quick preset with Haiku model
options = OptionBuilder.with_haiku()
# Development preset (verbose, permissive)
options = OptionBuilder.build_development_options()
# Production preset (restrictive, safe)
options = OptionBuilder.build_production_options()
ClaudeAgentSDK.query("Hello!", options) |> Enum.to_list()Collecting Results
# Collect all messages
messages = ClaudeAgentSDK.query("Hello") |> Enum.to_list()
# Find specific message types
init_message = Enum.find(messages, &(&1.type == :system and &1.subtype == :init))
result = Enum.find(messages, &(&1.type == :result))
# Extract session ID for later resumption
session_id = init_message.data.session_id
# Check for successful completion
success? = match?(%{type: :result, subtype: :success}, result)Streaming Input (Enumerable Prompts)
You can stream a sequence of user messages into a single query by passing an Enumerable prompt:
prompts = [
%{"type" => "user", "message" => %{"role" => "user", "content" => "Hello"}},
%{"type" => "user", "message" => %{"role" => "user", "content" => "How are you?"}}
]
ClaudeAgentSDK.query(prompts, %Options{})
|> Enum.to_list()If the internal input-stream worker crashes (or EOF signaling fails), the query
now emits an explicit :error_during_execution result message instead of waiting indefinitely.
Execution Surface Routing
Query flows already use the shared core runtime by default. Route them over SSH
or other core-owned surfaces with Options.execution_surface:
opts = %Options{
execution_surface: [
surface_kind: :ssh_exec,
transport_options: [
destination: "claude.example",
ssh_user: "sdk",
port: 22
]
]
}
ClaudeAgentSDK.query("Hello", opts)
|> Enum.to_list()Custom transport injection has been removed from Query, Client, and the
common CLI lane. Add new transport families in cli_subprocess_core, then
select them through execution_surface.
Query Streaming Module Configuration
For advanced use cases, you can override the query streaming module globally:
config :claude_agent_sdk,
cli_stream_module: ClaudeAgentSDK.Query.CLIStreamprocess_module remains as a deprecated fallback key for backward compatibility, but new
config should use cli_stream_module.
Streaming API
The Streaming API provides persistent sessions with real-time character-level updates. This is ideal for building chat interfaces.
When no control-only features are required, Streaming.start_session/1 uses the
common Claude provider profile and shared core session runtime. If hooks,
permission callbacks, or SDK MCP servers are configured, the facade switches to
the SDK-local control client family instead.
Starting a Session
alias ClaudeAgentSDK.{Streaming, Options}
options = %Options{
model: "haiku",
max_turns: 10,
allowed_tools: []
}
{:ok, session} = Streaming.start_session(options)Sending Messages
# Send a message and receive stream of events
Streaming.send_message(session, "Hello! What can you do?")
|> Enum.each(fn event ->
case event do
%{type: :text_delta, text: text} ->
IO.write(text)
%{type: :message_stop} ->
IO.puts("") # Newline after response
_ ->
:ok
end
end)Closing Sessions
Always close sessions when done to release resources:
# Close session
Streaming.close_session(session)
# Or use try/after pattern
{:ok, session} = Streaming.start_session(options)
try do
Streaming.send_message(session, "Hello")
|> Enum.to_list()
after
Streaming.close_session(session)
endGetting Session ID
{:ok, session_id} = Streaming.get_session_id(session)
IO.puts("Session ID: #{session_id}")Event Types
The Streaming API emits various event types for fine-grained control.
Each event map also includes streaming metadata: uuid, session_id, parent_tool_use_id, and raw_event (the raw CLI event map with string keys). Stream event wrappers require both uuid and session_id (missing keys raise) to match Python SDK behavior.
Text Streaming Events
# Text delta - partial text as it's generated
%{type: :text_delta, text: "Hello", accumulated: "Hello"}
# Message stop - response complete
%{type: :message_stop, final_text: "Hello, how can I help?"}Message Lifecycle Events
# Message start - new response beginning
%{type: :message_start, model: "haiku", role: "assistant", usage: %{}}
# Content block lifecycle
%{type: :text_block_start}
%{type: :content_block_stop, final_text: "Complete text"}
# Message delta with metadata
%{type: :message_delta, stop_reason: "end_turn", stop_sequence: nil}Tool Events
When Claude uses tools, you receive lifecycle events:
# Tool use start
%{type: :tool_use_start, name: "Bash", id: "tool_123"}
# Tool input being streamed
%{type: :tool_input_delta, json: "{\"command\": \"ls\"}"}
# Content block stop fires when the tool-use block is complete
%{type: :content_block_stop, final_text: ""}Thinking Events (Extended Thinking)
# Thinking start
%{type: :thinking_start}
# Thinking content
%{type: :thinking_delta, thinking: "Let me analyze this..."}Error Events
# Stream error
%{type: :error, error: :timeout}
%{type: :error, error: :connection_closed}
%{type: :error, error: {:api_error, "Rate limit exceeded"}}If the CLI emits a JSON frame larger than max_buffer_size (default 1MB), the stream terminates with a CLIJSONDecodeError.
Subagent Events (parent_tool_use_id)
When Claude uses the Agent tool to spawn subagents, streaming events include a parent_tool_use_id field that identifies which Agent tool call produced the event. This is critical for building hierarchical UIs that route subagent output to the correct panel.
# Main agent events have parent_tool_use_id: nil
%{type: :text_delta, text: "Let me search...", parent_tool_use_id: nil}
# Subagent events have parent_tool_use_id set to the Agent tool call ID
%{type: :text_delta, text: "Found 3 files", parent_tool_use_id: "toolu_01ABC123"}
%{type: :message_stop, parent_tool_use_id: "toolu_01ABC123"}Use cases:
- Route main agent output to the primary chat panel
- Display subagent output in nested/collapsible panels
- Track which subagent produced which response
- Build hierarchical streaming UIs for multi-agent workflows
Example: Routing by source
Streaming.send_message(session, "Use the Agent tool to find .ex files")
|> Enum.each(fn event ->
label = case event.parent_tool_use_id do
nil -> "[MAIN]"
id -> "[SUB:#{String.slice(id, 0, 8)}]"
end
case event do
%{type: :text_delta, text: text} ->
IO.puts("#{label} #{text}")
%{type: :message_stop} ->
IO.puts("#{label} Complete")
_ ->
:ok
end
end)See examples/streaming_tools/subagent_streaming.exs for a complete working example.
Complete Event Handling Pattern
Streaming.send_message(session, prompt)
|> Enum.reduce_while(%{text: "", tools: [], error: nil}, fn event, acc ->
case event do
%{type: :text_delta, text: chunk} ->
IO.write(chunk)
{:cont, %{acc | text: acc.text <> chunk}}
%{type: :tool_use_start, name: name, id: id} ->
IO.puts("\n[Using tool: #{name}]")
{:cont, %{acc | tools: [{name, id} | acc.tools]}}
%{type: :content_block_stop} ->
IO.puts("[Content block complete]")
{:cont, acc}
%{type: :message_stop} ->
IO.puts("")
{:halt, acc}
%{type: :error, error: reason} ->
{:halt, %{acc | error: reason}}
_ ->
{:cont, acc}
end
end)Real-Time Typewriter Effect
Creating a typewriter effect for chat interfaces:
Basic Typewriter
alias ClaudeAgentSDK.{Streaming, Options}
defmodule Typewriter do
def chat(prompt) do
options = %Options{model: "haiku", max_turns: 1, allowed_tools: []}
{:ok, session} = Streaming.start_session(options)
try do
IO.write("Claude: ")
Streaming.send_message(session, prompt)
|> Enum.each(fn
%{type: :text_delta, text: text} ->
IO.write(text)
# Optional: add delay for visible effect
Process.sleep(10)
%{type: :message_stop} ->
IO.puts("\n")
_ ->
:ok
end)
after
Streaming.close_session(session)
end
end
end
Typewriter.chat("Tell me a short story")With Character Count and Progress
defmodule TypewriterWithStats do
def stream_response(session, prompt) do
IO.write("Claude: ")
result = Streaming.send_message(session, prompt)
|> Enum.reduce(%{chars: 0, words: 0}, fn event, acc ->
case event do
%{type: :text_delta, text: text} ->
IO.write(text)
words = text |> String.split() |> length()
%{acc | chars: acc.chars + String.length(text), words: acc.words + words}
%{type: :message_stop} ->
IO.puts("")
acc
_ ->
acc
end
end)
IO.puts("Stats: #{result.chars} characters, #{result.words} words")
result
end
endLiveView Integration Pattern
For Phoenix LiveView applications:
defmodule MyAppWeb.ChatLive do
use Phoenix.LiveView
alias ClaudeAgentSDK.{Streaming, Options}
def mount(_params, _session, socket) do
{:ok, assign(socket, messages: [], streaming: false, current_text: "")}
end
def handle_event("send_message", %{"message" => text}, socket) do
# Start streaming in background task
parent = self()
Task.start(fn ->
options = %Options{model: "haiku", max_turns: 1}
{:ok, session} = Streaming.start_session(options)
try do
Streaming.send_message(session, text)
|> Enum.each(fn event ->
send(parent, {:stream_event, event})
end)
after
Streaming.close_session(session)
end
end)
{:noreply, assign(socket, streaming: true, current_text: "")}
end
def handle_info({:stream_event, event}, socket) do
case event do
%{type: :text_delta, text: chunk} ->
new_text = socket.assigns.current_text <> chunk
{:noreply, assign(socket, current_text: new_text)}
%{type: :message_stop} ->
messages = socket.assigns.messages ++ [socket.assigns.current_text]
{:noreply, assign(socket, messages: messages, streaming: false, current_text: "")}
_ ->
{:noreply, socket}
end
end
endMulti-Turn Conversations
The Streaming API maintains context across messages within a session.
Basic Multi-Turn
alias ClaudeAgentSDK.{Streaming, Options}
options = %Options{model: "haiku", max_turns: 5}
{:ok, session} = Streaming.start_session(options)
try do
# First message
IO.puts("You: My name is Alice")
IO.write("Claude: ")
Streaming.send_message(session, "My name is Alice")
|> Enum.each(fn
%{type: :text_delta, text: t} -> IO.write(t)
%{type: :message_stop} -> IO.puts("\n")
_ -> :ok
end)
# Follow-up - Claude remembers context
IO.puts("You: What's my name?")
IO.write("Claude: ")
Streaming.send_message(session, "What's my name?")
|> Enum.each(fn
%{type: :text_delta, text: t} -> IO.write(t)
%{type: :message_stop} -> IO.puts("\n")
_ -> :ok
end)
after
Streaming.close_session(session)
endInteractive Chat Loop
defmodule InteractiveChat do
alias ClaudeAgentSDK.{Streaming, Options}
def start do
options = %Options{
model: "haiku",
max_turns: 50,
system_prompt: "You are a helpful assistant. Be concise."
}
{:ok, session} = Streaming.start_session(options)
IO.puts("Chat started. Type 'quit' to exit.\n")
chat_loop(session)
end
defp chat_loop(session) do
IO.write("You: ")
input = IO.gets("") |> String.trim()
case input do
"quit" ->
Streaming.close_session(session)
IO.puts("Goodbye!")
"" ->
chat_loop(session)
message ->
IO.write("Claude: ")
Streaming.send_message(session, message)
|> Enum.each(fn
%{type: :text_delta, text: text} -> IO.write(text)
%{type: :message_stop} -> IO.puts("\n")
%{type: :error, error: reason} -> IO.puts("\n[Error: #{inspect(reason)}]")
_ -> :ok
end)
chat_loop(session)
end
end
end
InteractiveChat.start()Resuming Sessions with query/2
For non-streaming session resumption:
alias ClaudeAgentSDK.{Options, Session}
# Initial query
messages = ClaudeAgentSDK.query("My name is Bob", options) |> Enum.to_list()
session_id = Session.extract_session_id(messages)
# Later: resume the conversation
resumed = ClaudeAgentSDK.resume(session_id, "What's my name?", options)
|> Enum.to_list()Error Handling in Streams
Control-client streaming surfaces startup/send failures immediately as stream
events (%{type: :error, error: reason}), so callers no longer need to wait for
the generic 5-minute stream timeout to detect initialization failures.
Handling Stream Errors
alias ClaudeAgentSDK.{Streaming, Options}
{:ok, session} = Streaming.start_session(%Options{})
result = Streaming.send_message(session, prompt)
|> Enum.reduce_while({:ok, ""}, fn event, {status, text} ->
case event do
%{type: :text_delta, text: chunk} ->
{:cont, {status, text <> chunk}}
%{type: :message_stop} ->
{:halt, {:ok, text}}
%{type: :error, error: reason} ->
{:halt, {:error, reason}}
_ ->
{:cont, {status, text}}
end
end)
case result do
{:ok, response} ->
IO.puts("Response: #{response}")
{:error, :timeout} ->
IO.puts("Request timed out")
{:error, :connection_closed} ->
IO.puts("Connection was closed")
{:error, :not_connected} ->
IO.puts("Transport not connected")
{:error, reason} ->
IO.puts("Error: #{inspect(reason)}")
endDetecting Assistant Errors
# In aggregated message mode
messages = ClaudeAgentSDK.query(prompt, options) |> Enum.to_list()
# Check for assistant errors
assistant_error = Enum.find_value(messages, fn
%{type: :assistant, data: %{error: err}} when not is_nil(err) -> err
_ -> nil
end)
case assistant_error do
nil -> :ok
:rate_limit -> IO.puts("Rate limited, please retry")
:authentication_failed -> IO.puts("Authentication issue")
error -> IO.puts("Error: #{inspect(error)}")
endAssistant errors and rate-limit events are separate signals:
- Assistant errors appear on
:assistantmessages inmessage.data.error - CLI rate-limit status changes appear as
:rate_limit_eventmessages in the stream - This is intentionally CLI-faithful behavior; the current Python SDK skips unknown message types for forward compatibility
Enum.each(messages, fn
%{type: :rate_limit_event, data: %{rate_limit_info: info}} ->
IO.puts("Rate limit status: #{info.status}")
_ ->
:ok
end)Handling Connection Issues
defmodule ResilientChat do
alias ClaudeAgentSDK.{Streaming, Options}
@max_retries 3
def send_with_retry(session, message, retries \\ 0) do
result = collect_response(session, message)
case result do
{:ok, text} ->
{:ok, text}
{:error, reason} when reason in [:connection_closed, :not_connected] and retries < @max_retries ->
IO.puts("[Retrying... attempt #{retries + 1}]")
Process.sleep(1000 * (retries + 1)) # Exponential backoff
send_with_retry(session, message, retries + 1)
{:error, reason} ->
{:error, reason}
end
end
defp collect_response(session, message) do
Streaming.send_message(session, message)
|> Enum.reduce_while({:ok, ""}, fn event, {_, text} ->
case event do
%{type: :text_delta, text: chunk} ->
{:cont, {:ok, text <> chunk}}
%{type: :message_stop} ->
{:halt, {:ok, text}}
%{type: :error, error: reason} ->
{:halt, {:error, reason}}
_ ->
{:cont, {:ok, text}}
end
end)
end
endTimeout Handling
alias ClaudeAgentSDK.Options
# Set custom timeout in options
options = %Options{
model: "haiku",
timeout_ms: 120_000 # 2 minutes
}
# Or use Task.yield for client-side timeout
task = Task.async(fn ->
Streaming.send_message(session, prompt) |> Enum.to_list()
end)
case Task.yield(task, 30_000) || Task.shutdown(task) do
{:ok, events} ->
process_events(events)
nil ->
IO.puts("Request timed out after 30 seconds")
endBest Practices
1. Always Close Sessions
# Use try/after pattern
{:ok, session} = Streaming.start_session(options)
try do
Streaming.send_message(session, prompt)
|> Enum.to_list()
after
Streaming.close_session(session)
end2. Use Appropriate Streaming Mode
# For simple queries without real-time needs
messages = ClaudeAgentSDK.query(prompt, options) |> Enum.to_list()
# For chat UIs needing real-time updates
{:ok, session} = Streaming.start_session(options)
Streaming.send_message(session, prompt) |> ...3. Handle All Event Types
# Be explicit about handling events
Streaming.send_message(session, prompt)
|> Enum.each(fn
%{type: :text_delta, text: t} -> handle_text(t)
%{type: :tool_use_start, name: n} -> handle_tool_start(n)
%{type: :message_stop} -> handle_complete()
%{type: :error, error: e} -> handle_error(e)
event -> Logger.debug("Unhandled event: #{inspect(event)}")
end)4. Limit Memory Usage with Large Streams
# Process events without collecting all in memory
Streaming.send_message(session, prompt)
|> Stream.filter(&match?(%{type: :text_delta}, &1))
|> Stream.map(& &1.text)
|> Enum.reduce("", &(&2 <> &1))5. Use Appropriate Options for Your Use Case
# For quick responses
options = %Options{
model: "haiku",
max_turns: 1,
allowed_tools: [] # Disable tools for faster response
}
# For complex tasks
options = %Options{
model: "sonnet",
max_turns: 10,
permission_mode: :accept_edits,
timeout_ms: 300_000 # 5 minutes
}6. Monitor Session State
# Get session ID for logging/debugging
{:ok, session_id} = Streaming.get_session_id(session)
Logger.info("Starting chat in session #{session_id}")7. Consider Using OptionBuilder
alias ClaudeAgentSDK.OptionBuilder
# Environment-appropriate defaults
options = OptionBuilder.for_environment()
# Add specific overrides
options = OptionBuilder.merge(:development, %{max_turns: 10})8. Handle Partial Message Mode
# Enable partial messages for streaming events
options = %Options{
include_partial_messages: true,
preferred_transport: :auto
}
# Now you'll receive text_delta events9. Use Streaming with Hooks
alias ClaudeAgentSDK.Hooks.{Matcher, Output}
callback = fn input, _id, _ctx ->
IO.puts("[Tool: #{input["tool_name"]}]")
Output.allow()
end
options = %Options{
hooks: %{
pre_tool_use: [Matcher.new("*", [callback])]
}
}
{:ok, session} = Streaming.start_session(options)10. Subscriber Lifecycle Monitoring
Client and Streaming.Session automatically monitor subscriber processes and prune dead subscribers. You do not need to manually unsubscribe when a subscriber process terminates — the SDK handles cleanup via Process.monitor/1. This prevents message sends to terminated processes in production.
11. Clean Shutdown Pattern
defmodule ChatManager do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
Process.flag(:trap_exit, true)
{:ok, %{session: nil}}
end
def terminate(_reason, %{session: session}) when not is_nil(session) do
Streaming.close_session(session)
end
def terminate(_reason, _state), do: :ok
endSummary
The Claude Agent SDK provides flexible streaming options:
query/2for simple, aggregated responses- Streaming API for real-time, interactive applications
Key patterns:
- Use
Streaming.start_session/1for persistent sessions - Handle events with pattern matching for clean code
- Always close sessions in
afterblocks - Use
Enum.reduce_while/3for early termination on errors - Consider
include_partial_messages: truefor streaming events
For more examples, see the examples/streaming_tools/ directory in the SDK source.