This guide covers streaming in the Claude Agent SDK for Elixir, from simple queries to advanced real-time streaming with multi-turn conversations.
Table of Contents
- Overview
- Simple Query Streaming with query/2
- Streaming API
- Event Types
- Real-Time Typewriter Effect
- Multi-Turn Conversations
- Error Handling in Streams
- Best Practices
Overview
The Claude Agent SDK provides two streaming approaches:
Simple Streaming via
query/2- Returns a lazy Elixir stream of parsed messages as they arrive from the CLI.Bidirectional Streaming API - Provides persistent sessions with real-time character-by-character updates. Best for chat interfaces and interactive applications.
Key Differences
| Feature | query/2 | Streaming API |
|---|---|---|
| Real-time text | Message-level | Yes (character-level) |
| Multi-turn | Via resume/3 | Native session support |
| Event granularity | Message-level | Token-level |
| Resource usage | Lower | Session process |
| Use case | Scripts, batch | Chat UIs, interactive |
Simple Query Streaming with query/2
The ClaudeAgentSDK.query/2 function returns a lazy stream of Message structs. This is the simplest way to interact with Claude, and it yields messages as the CLI emits them.
Basic Usage
alias ClaudeAgentSDK.{Options, ContentExtractor}
# Simple query with default options
ClaudeAgentSDK.query("Write a haiku about Elixir")
|> Enum.each(fn message ->
case message.type do
:system ->
IO.puts("Session started: #{message.data.session_id}")
:assistant ->
text = ContentExtractor.extract_text(message)
IO.puts("Claude: #{text}")
:result ->
IO.puts("Cost: $#{message.data.total_cost_usd}")
_ ->
:ok
end
end)With Options
options = %Options{
model: "haiku",
max_turns: 5,
system_prompt: "You are a helpful coding assistant.",
output_format: :stream_json
}
messages = ClaudeAgentSDK.query("Explain pattern matching", options)
|> Enum.to_list()
# Extract just the text responses
text = messages
|> Enum.filter(&(&1.type == :assistant))
|> Enum.map(&ContentExtractor.extract_text/1)
|> Enum.join("\n")
IO.puts(text)Using OptionBuilder Presets
alias ClaudeAgentSDK.OptionBuilder
# Quick preset with Haiku model
options = OptionBuilder.with_haiku()
# Development preset (verbose, permissive)
options = OptionBuilder.build_development_options()
# Production preset (restrictive, safe)
options = OptionBuilder.build_production_options()
ClaudeAgentSDK.query("Hello!", options) |> Enum.to_list()Collecting Results
# Collect all messages
messages = ClaudeAgentSDK.query("Hello") |> Enum.to_list()
# Find specific message types
init_message = Enum.find(messages, &(&1.type == :system and &1.subtype == :init))
result = Enum.find(messages, &(&1.type == :result))
# Extract session ID for later resumption
session_id = init_message.data.session_id
# Check for successful completion
success? = match?(%{type: :result, subtype: :success}, result)Streaming Input (Enumerable Prompts)
You can stream a sequence of user messages into a single query by passing an Enumerable prompt:
prompts = [
%{"type" => "user", "message" => %{"role" => "user", "content" => "Hello"}},
%{"type" => "user", "message" => %{"role" => "user", "content" => "How are you?"}}
]
ClaudeAgentSDK.query(prompts, %Options{})
|> Enum.to_list()If the internal input-stream worker crashes (or EOF signaling fails), the query
now emits an explicit :error_during_execution result message instead of waiting indefinitely.
Custom Transport Injection
You can inject a transport for query flows (module or {module, opts} tuple):
ClaudeAgentSDK.query("Hello", %Options{}, {ClaudeAgentSDK.Transport.Port, []})
|> Enum.to_list()You can also defer subprocess startup with lazy mode:
ClaudeAgentSDK.query(
"Hello",
%Options{},
{ClaudeAgentSDK.Transport.Port, [startup_mode: :lazy]}
)
|> Enum.to_list()In lazy mode, startup errors can happen after start_link succeeds and are delivered as process exits.
Query Streaming Module Configuration
For advanced use cases, you can override the query streaming module globally:
config :claude_agent_sdk,
cli_stream_module: ClaudeAgentSDK.Query.CLIStreamprocess_module remains as a deprecated fallback key for backward compatibility, but new
config should use cli_stream_module.
Streaming API
The Streaming API provides persistent sessions with real-time character-level updates. This is ideal for building chat interfaces.
Starting a Session
alias ClaudeAgentSDK.{Streaming, Options}
options = %Options{
model: "haiku",
max_turns: 10,
allowed_tools: []
}
{:ok, session} = Streaming.start_session(options)Sending Messages
# Send a message and receive stream of events
Streaming.send_message(session, "Hello! What can you do?")
|> Enum.each(fn event ->
case event do
%{type: :text_delta, text: text} ->
IO.write(text)
%{type: :message_stop} ->
IO.puts("") # Newline after response
_ ->
:ok
end
end)Closing Sessions
Always close sessions when done to release resources:
# Close session
Streaming.close_session(session)
# Or use try/after pattern
{:ok, session} = Streaming.start_session(options)
try do
Streaming.send_message(session, "Hello")
|> Enum.to_list()
after
Streaming.close_session(session)
endGetting Session ID
{:ok, session_id} = Streaming.get_session_id(session)
IO.puts("Session ID: #{session_id}")Event Types
The Streaming API emits various event types for fine-grained control.
Each event map also includes streaming metadata: uuid, session_id, parent_tool_use_id, and raw_event (the raw CLI event map with string keys). Stream event wrappers require both uuid and session_id (missing keys raise) to match Python SDK behavior.
Text Streaming Events
# Text delta - partial text as it's generated
%{type: :text_delta, text: "Hello", accumulated: "Hello"}
# Message stop - response complete
%{type: :message_stop, final_text: "Hello, how can I help?"}Message Lifecycle Events
# Message start - new response beginning
%{type: :message_start, model: "claude-3-haiku", role: "assistant", usage: %{}}
# Content block lifecycle
%{type: :content_block_start}
%{type: :content_block_stop, final_text: "Complete text"}
# Message delta with metadata
%{type: :message_delta, stop_reason: "end_turn", stop_sequence: nil}Tool Events
When Claude uses tools, you receive lifecycle events:
# Tool use start
%{type: :tool_use_start, name: "Bash", id: "tool_123"}
# Tool input being streamed
%{type: :tool_input_delta, json: "{\"command\": \"ls\"}"}
# Tool execution complete
%{type: :tool_complete, tool_name: "Bash", result: "..."}Thinking Events (Extended Thinking)
# Thinking start
%{type: :thinking_start}
# Thinking content
%{type: :thinking_delta, thinking: "Let me analyze this..."}
# Thinking end
%{type: :thinking_stop}Error Events
# Stream error
%{type: :error, error: :timeout}
%{type: :error, error: :connection_closed}
%{type: :error, error: {:api_error, "Rate limit exceeded"}}If the CLI emits a JSON frame larger than max_buffer_size (default 1MB), the stream terminates with a CLIJSONDecodeError.
Subagent Events (parent_tool_use_id)
When Claude uses the Task tool to spawn subagents, streaming events include a parent_tool_use_id field that identifies which Task tool call produced the event. This is critical for building hierarchical UIs that route subagent output to the correct panel.
# Main agent events have parent_tool_use_id: nil
%{type: :text_delta, text: "Let me search...", parent_tool_use_id: nil}
# Subagent events have parent_tool_use_id set to the Task tool call ID
%{type: :text_delta, text: "Found 3 files", parent_tool_use_id: "toolu_01ABC123"}
%{type: :message_stop, parent_tool_use_id: "toolu_01ABC123"}Use cases:
- Route main agent output to the primary chat panel
- Display subagent output in nested/collapsible panels
- Track which subagent produced which response
- Build hierarchical streaming UIs for multi-agent workflows
Example: Routing by source
Streaming.send_message(session, "Use the Task tool to find .ex files")
|> Enum.each(fn event ->
label = case event.parent_tool_use_id do
nil -> "[MAIN]"
id -> "[SUB:#{String.slice(id, 0, 8)}]"
end
case event do
%{type: :text_delta, text: text} ->
IO.puts("#{label} #{text}")
%{type: :message_stop} ->
IO.puts("#{label} Complete")
_ ->
:ok
end
end)See examples/streaming_tools/subagent_streaming.exs for a complete working example.
Complete Event Handling Pattern
Streaming.send_message(session, prompt)
|> Enum.reduce_while(%{text: "", tools: [], error: nil}, fn event, acc ->
case event do
%{type: :text_delta, text: chunk} ->
IO.write(chunk)
{:cont, %{acc | text: acc.text <> chunk}}
%{type: :tool_use_start, name: name, id: id} ->
IO.puts("\n[Using tool: #{name}]")
{:cont, %{acc | tools: [{name, id} | acc.tools]}}
%{type: :tool_complete, tool_name: name} ->
IO.puts("[Tool #{name} complete]")
{:cont, acc}
%{type: :message_stop} ->
IO.puts("")
{:halt, acc}
%{type: :error, error: reason} ->
{:halt, %{acc | error: reason}}
_ ->
{:cont, acc}
end
end)Real-Time Typewriter Effect
Creating a typewriter effect for chat interfaces:
Basic Typewriter
alias ClaudeAgentSDK.{Streaming, Options}
defmodule Typewriter do
def chat(prompt) do
options = %Options{model: "haiku", max_turns: 1, allowed_tools: []}
{:ok, session} = Streaming.start_session(options)
try do
IO.write("Claude: ")
Streaming.send_message(session, prompt)
|> Enum.each(fn
%{type: :text_delta, text: text} ->
IO.write(text)
# Optional: add delay for visible effect
Process.sleep(10)
%{type: :message_stop} ->
IO.puts("\n")
_ ->
:ok
end)
after
Streaming.close_session(session)
end
end
end
Typewriter.chat("Tell me a short story")With Character Count and Progress
defmodule TypewriterWithStats do
def stream_response(session, prompt) do
IO.write("Claude: ")
result = Streaming.send_message(session, prompt)
|> Enum.reduce(%{chars: 0, words: 0}, fn event, acc ->
case event do
%{type: :text_delta, text: text} ->
IO.write(text)
words = text |> String.split() |> length()
%{acc | chars: acc.chars + String.length(text), words: acc.words + words}
%{type: :message_stop} ->
IO.puts("")
acc
_ ->
acc
end
end)
IO.puts("Stats: #{result.chars} characters, #{result.words} words")
result
end
endLiveView Integration Pattern
For Phoenix LiveView applications:
defmodule MyAppWeb.ChatLive do
use Phoenix.LiveView
alias ClaudeAgentSDK.{Streaming, Options}
def mount(_params, _session, socket) do
{:ok, assign(socket, messages: [], streaming: false, current_text: "")}
end
def handle_event("send_message", %{"message" => text}, socket) do
# Start streaming in background task
parent = self()
Task.start(fn ->
options = %Options{model: "haiku", max_turns: 1}
{:ok, session} = Streaming.start_session(options)
try do
Streaming.send_message(session, text)
|> Enum.each(fn event ->
send(parent, {:stream_event, event})
end)
after
Streaming.close_session(session)
end
end)
{:noreply, assign(socket, streaming: true, current_text: "")}
end
def handle_info({:stream_event, event}, socket) do
case event do
%{type: :text_delta, text: chunk} ->
new_text = socket.assigns.current_text <> chunk
{:noreply, assign(socket, current_text: new_text)}
%{type: :message_stop} ->
messages = socket.assigns.messages ++ [socket.assigns.current_text]
{:noreply, assign(socket, messages: messages, streaming: false, current_text: "")}
_ ->
{:noreply, socket}
end
end
endMulti-Turn Conversations
The Streaming API maintains context across messages within a session.
Basic Multi-Turn
alias ClaudeAgentSDK.{Streaming, Options}
options = %Options{model: "haiku", max_turns: 5}
{:ok, session} = Streaming.start_session(options)
try do
# First message
IO.puts("You: My name is Alice")
IO.write("Claude: ")
Streaming.send_message(session, "My name is Alice")
|> Enum.each(fn
%{type: :text_delta, text: t} -> IO.write(t)
%{type: :message_stop} -> IO.puts("\n")
_ -> :ok
end)
# Follow-up - Claude remembers context
IO.puts("You: What's my name?")
IO.write("Claude: ")
Streaming.send_message(session, "What's my name?")
|> Enum.each(fn
%{type: :text_delta, text: t} -> IO.write(t)
%{type: :message_stop} -> IO.puts("\n")
_ -> :ok
end)
after
Streaming.close_session(session)
endInteractive Chat Loop
defmodule InteractiveChat do
alias ClaudeAgentSDK.{Streaming, Options}
def start do
options = %Options{
model: "haiku",
max_turns: 50,
system_prompt: "You are a helpful assistant. Be concise."
}
{:ok, session} = Streaming.start_session(options)
IO.puts("Chat started. Type 'quit' to exit.\n")
chat_loop(session)
end
defp chat_loop(session) do
IO.write("You: ")
input = IO.gets("") |> String.trim()
case input do
"quit" ->
Streaming.close_session(session)
IO.puts("Goodbye!")
"" ->
chat_loop(session)
message ->
IO.write("Claude: ")
Streaming.send_message(session, message)
|> Enum.each(fn
%{type: :text_delta, text: text} -> IO.write(text)
%{type: :message_stop} -> IO.puts("\n")
%{type: :error, error: reason} -> IO.puts("\n[Error: #{inspect(reason)}]")
_ -> :ok
end)
chat_loop(session)
end
end
end
InteractiveChat.start()Resuming Sessions with query/2
For non-streaming session resumption:
alias ClaudeAgentSDK.{Options, Session}
# Initial query
messages = ClaudeAgentSDK.query("My name is Bob", options) |> Enum.to_list()
session_id = Session.extract_session_id(messages)
# Later: resume the conversation
resumed = ClaudeAgentSDK.resume(session_id, "What's my name?", options)
|> Enum.to_list()Error Handling in Streams
Control-client streaming surfaces startup/send failures immediately as stream
events (%{type: :error, error: reason}), so callers no longer need to wait for
the generic 5-minute stream timeout to detect initialization failures.
Handling Stream Errors
alias ClaudeAgentSDK.{Streaming, Options}
{:ok, session} = Streaming.start_session(%Options{})
result = Streaming.send_message(session, prompt)
|> Enum.reduce_while({:ok, ""}, fn event, {status, text} ->
case event do
%{type: :text_delta, text: chunk} ->
{:cont, {status, text <> chunk}}
%{type: :message_stop} ->
{:halt, {:ok, text}}
%{type: :error, error: reason} ->
{:halt, {:error, reason}}
_ ->
{:cont, {status, text}}
end
end)
case result do
{:ok, response} ->
IO.puts("Response: #{response}")
{:error, :timeout} ->
IO.puts("Request timed out")
{:error, :connection_closed} ->
IO.puts("Connection was closed")
{:error, :not_connected} ->
IO.puts("Transport not connected")
{:error, reason} ->
IO.puts("Error: #{inspect(reason)}")
endDetecting Assistant Errors
# In aggregated message mode
messages = ClaudeAgentSDK.query(prompt, options) |> Enum.to_list()
# Check for assistant errors
assistant_error = Enum.find_value(messages, fn
%{type: :assistant, data: %{error: err}} when not is_nil(err) -> err
_ -> nil
end)
case assistant_error do
nil -> :ok
:rate_limit -> IO.puts("Rate limited, please retry")
:authentication_failed -> IO.puts("Authentication issue")
error -> IO.puts("Error: #{inspect(error)}")
endHandling Connection Issues
defmodule ResilientChat do
alias ClaudeAgentSDK.{Streaming, Options}
@max_retries 3
def send_with_retry(session, message, retries \\ 0) do
result = collect_response(session, message)
case result do
{:ok, text} ->
{:ok, text}
{:error, reason} when reason in [:connection_closed, :not_connected] and retries < @max_retries ->
IO.puts("[Retrying... attempt #{retries + 1}]")
Process.sleep(1000 * (retries + 1)) # Exponential backoff
send_with_retry(session, message, retries + 1)
{:error, reason} ->
{:error, reason}
end
end
defp collect_response(session, message) do
Streaming.send_message(session, message)
|> Enum.reduce_while({:ok, ""}, fn event, {_, text} ->
case event do
%{type: :text_delta, text: chunk} ->
{:cont, {:ok, text <> chunk}}
%{type: :message_stop} ->
{:halt, {:ok, text}}
%{type: :error, error: reason} ->
{:halt, {:error, reason}}
_ ->
{:cont, {:ok, text}}
end
end)
end
endTimeout Handling
alias ClaudeAgentSDK.Options
# Set custom timeout in options
options = %Options{
model: "haiku",
timeout_ms: 120_000 # 2 minutes
}
# Or use Task.yield for client-side timeout
task = Task.async(fn ->
Streaming.send_message(session, prompt) |> Enum.to_list()
end)
case Task.yield(task, 30_000) || Task.shutdown(task) do
{:ok, events} ->
process_events(events)
nil ->
IO.puts("Request timed out after 30 seconds")
endBest Practices
1. Always Close Sessions
# Use try/after pattern
{:ok, session} = Streaming.start_session(options)
try do
Streaming.send_message(session, prompt)
|> Enum.to_list()
after
Streaming.close_session(session)
end2. Use Appropriate Streaming Mode
# For simple queries without real-time needs
messages = ClaudeAgentSDK.query(prompt, options) |> Enum.to_list()
# For chat UIs needing real-time updates
{:ok, session} = Streaming.start_session(options)
Streaming.send_message(session, prompt) |> ...3. Handle All Event Types
# Be explicit about handling events
Streaming.send_message(session, prompt)
|> Enum.each(fn
%{type: :text_delta, text: t} -> handle_text(t)
%{type: :tool_use_start, name: n} -> handle_tool_start(n)
%{type: :message_stop} -> handle_complete()
%{type: :error, error: e} -> handle_error(e)
event -> Logger.debug("Unhandled event: #{inspect(event)}")
end)4. Limit Memory Usage with Large Streams
# Process events without collecting all in memory
Streaming.send_message(session, prompt)
|> Stream.filter(&match?(%{type: :text_delta}, &1))
|> Stream.map(& &1.text)
|> Enum.reduce("", &(&2 <> &1))5. Use Appropriate Options for Your Use Case
# For quick responses
options = %Options{
model: "haiku",
max_turns: 1,
allowed_tools: [] # Disable tools for faster response
}
# For complex tasks
options = %Options{
model: "sonnet",
max_turns: 10,
permission_mode: :accept_edits,
timeout_ms: 300_000 # 5 minutes
}6. Monitor Session State
# Get session ID for logging/debugging
{:ok, session_id} = Streaming.get_session_id(session)
Logger.info("Starting chat in session #{session_id}")7. Consider Using OptionBuilder
alias ClaudeAgentSDK.OptionBuilder
# Environment-appropriate defaults
options = OptionBuilder.for_environment()
# Add specific overrides
options = OptionBuilder.merge(:development, %{max_turns: 10})8. Handle Partial Message Mode
# Enable partial messages for streaming events
options = %Options{
include_partial_messages: true,
preferred_transport: :auto
}
# Now you'll receive text_delta events9. Use Streaming with Hooks
alias ClaudeAgentSDK.Hooks.{Matcher, Output}
callback = fn input, _id, _ctx ->
IO.puts("[Tool: #{input["tool_name"]}]")
Output.allow()
end
options = %Options{
hooks: %{
pre_tool_use: [Matcher.new("*", [callback])]
}
}
{:ok, session} = Streaming.start_session(options)10. Subscriber Lifecycle Monitoring
Client and Streaming.Session automatically monitor subscriber processes and prune dead subscribers. You do not need to manually unsubscribe when a subscriber process terminates — the SDK handles cleanup via Process.monitor/1. This prevents message sends to terminated processes in production.
11. Clean Shutdown Pattern
defmodule ChatManager do
use GenServer
def start_link(opts) do
GenServer.start_link(__MODULE__, opts, name: __MODULE__)
end
def init(_opts) do
Process.flag(:trap_exit, true)
{:ok, %{session: nil}}
end
def terminate(_reason, %{session: session}) when not is_nil(session) do
Streaming.close_session(session)
end
def terminate(_reason, _state), do: :ok
endSummary
The Claude Agent SDK provides flexible streaming options:
query/2for simple, aggregated responses- Streaming API for real-time, interactive applications
Key patterns:
- Use
Streaming.start_session/1for persistent sessions - Handle events with pattern matching for clean code
- Always close sessions in
afterblocks - Use
Enum.reduce_while/3for early termination on errors - Consider
include_partial_messages: truefor streaming events
For more examples, see the examples/streaming_tools/ directory in the SDK source.