Stream SubAgent responses token-by-token in a Phoenix LiveView chat interface.
Overview
SubAgent.chat/3 supports streaming via the on_chunk callback. In a LiveView, spawn a Task that runs chat/3, stream chunks to the LiveView process via send/2, and push them to the browser with push_event/3.
LiveView Module
defmodule MyAppWeb.ChatLive do
use MyAppWeb, :live_view
alias PtcRunner.SubAgent
@agent SubAgent.new(
prompt: "placeholder",
output: :text,
system_prompt: "You are a helpful assistant."
)
@impl true
def mount(_params, _session, socket) do
{:ok,
assign(socket,
chat_messages: [],
streaming: false,
current_response: ""
)
|> stream(:messages, [])}
end
@impl true
def handle_event("send_message", %{"chat" => %{"message" => message}}, socket) do
# Add user message to the display stream
user_msg = %{id: System.unique_integer([:positive]), role: :user, content: message}
lv_pid = self()
chat_messages = socket.assigns.chat_messages
Task.start(fn ->
result =
SubAgent.chat(@agent, message,
llm: my_llm(),
messages: chat_messages,
on_chunk: fn %{delta: delta} -> send(lv_pid, {:chunk, delta}) end
)
case result do
{:ok, _reply, updated_messages} ->
send(lv_pid, {:chat_done, updated_messages})
{:error, reason} ->
send(lv_pid, {:chat_error, reason})
end
end)
{:noreply,
socket
|> stream_insert(:messages, user_msg)
|> assign(streaming: true, current_response: "")}
end
@impl true
def handle_info({:chunk, delta}, socket) do
current = socket.assigns.current_response <> delta
{:noreply,
socket
|> assign(current_response: current)
|> push_event("stream-chunk", %{delta: delta})}
end
@impl true
def handle_info({:chat_done, updated_messages}, socket) do
assistant_msg = %{
id: System.unique_integer([:positive]),
role: :assistant,
content: socket.assigns.current_response
}
{:noreply,
socket
|> assign(streaming: false, current_response: "", chat_messages: updated_messages)
|> stream_insert(:messages, assistant_msg)
|> push_event("stream-done", %{})}
end
@impl true
def handle_info({:chat_error, reason}, socket) do
error_msg = %{
id: System.unique_integer([:positive]),
role: :assistant,
content: "Error: #{inspect(reason)}"
}
{:noreply,
socket
|> assign(streaming: false, current_response: "")
|> stream_insert(:messages, error_msg)}
end
defp my_llm do
PtcRunner.LLM.callback("openrouter:anthropic/claude-haiku-4.5")
end
endJavaScript Hooks
Three hooks handle the streaming UI: StreamChat appends chunks to the DOM, ScrollBottom keeps the chat scrolled down, and ChatForm clears the input on submit.
// assets/js/hooks.js
export const StreamChat = {
mounted() {
this.handleEvent("stream-chunk", ({ delta }) => {
const target = this.el.querySelector("[data-stream-target]")
if (target) target.textContent += delta
})
this.handleEvent("stream-done", () => {
// Streaming container will be removed on re-render
})
}
}
export const ScrollBottom = {
mounted() {
this.scrollToBottom()
this.observer = new MutationObserver(() => this.scrollToBottom())
this.observer.observe(this.el, { childList: true, subtree: true })
},
scrollToBottom() {
this.el.scrollTop = this.el.scrollHeight
},
destroyed() {
if (this.observer) this.observer.disconnect()
}
}
export const ChatForm = {
mounted() {
this.el.addEventListener("submit", () => {
const input = this.el.querySelector("input[type=text]")
setTimeout(() => { input.value = ""; input.focus() }, 0)
})
}
}Register the hooks in your app.js:
import { StreamChat, ScrollBottom, ChatForm } from "./hooks"
let liveSocket = new LiveSocket("/live", Socket, {
hooks: { StreamChat, ScrollBottom, ChatForm },
// ...
})Template
<div
id="messages"
phx-update="stream"
phx-hook="ScrollBottom"
class="flex-1 overflow-y-auto p-4"
>
<div :for={{dom_id, msg} <- @streams.messages} id={dom_id} class={msg.role}>
<strong><%= msg.role %>:</strong> <%= msg.content %>
</div>
</div>
<div :if={@streaming} id="stream-container" phx-hook="StreamChat">
<span data-stream-target></span>
</div>
<form phx-submit="send_message" phx-hook="ChatForm">
<input type="text" name="chat[message]" placeholder="Type a message..." />
<button type="submit" disabled={@streaming}>Send</button>
</form>Alternative: No JavaScript (Assign-Based)
If you prefer to avoid JS hooks, update an assign on each chunk and let LiveView re-render:
def handle_info({:chunk, delta}, socket) do
current = socket.assigns.current_response <> delta
{:noreply, assign(socket, current_response: current)}
end<p :if={@streaming}><%= @current_response %></p>Warning: This re-renders the LiveView on every single token. When the LLM streams fast, this can lock up the browser. For production use, buffer chunks and flush on a timer:
def handle_info({:chunk, delta}, socket) do
buffer = (socket.assigns[:chunk_buffer] || "") <> delta
# Schedule a flush if not already pending
unless socket.assigns[:flush_pending] do
Process.send_after(self(), :flush_chunks, 50)
end
{:noreply, assign(socket, chunk_buffer: buffer, flush_pending: true)}
end
def handle_info(:flush_chunks, socket) do
current = socket.assigns.current_response <> (socket.assigns[:chunk_buffer] || "")
{:noreply, assign(socket, current_response: current, chunk_buffer: "", flush_pending: false)}
endThe JS hook approach above is generally more efficient and recommended.
Notes
on_chunkfires per-token in text-only mode (no tools). With tools, it fires once with the full final answer after all tool calls complete.- The
messagesreturned bychat/3include the system prompt. On the next call,chat/3automatically strips system messages before forwarding to the LLM (which regenerates the system prompt from the agent struct). - For production use, consider adding a timeout mechanism and more granular error handling in the Task.
See Also
PtcRunner.SubAgent.chat/3- API reference- Getting Started - SubAgent basics
- LLM Setup - Provider configuration and streaming