PubSub & Presence
Copy MarkdownThis document covers real-time event streaming and viewer tracking in Sagents.
Overview
Sagents uses Phoenix.PubSub for real-time event broadcasting, enabling:
- Multiple LiveView clients watching the same conversation
- Real-time streaming of LLM responses
- TODO list and state synchronization
- Debug event streams for development tools
PubSub Configuration
Setup
- Ensure Phoenix.PubSub is started in your application:
# application.ex
children = [
{Phoenix.PubSub, name: MyApp.PubSub},
# ... other children
]- Pass PubSub to AgentServer:
AgentServer.start_link(
agent: agent,
pubsub: {Phoenix.PubSub, MyApp.PubSub}
)Topics
Each agent broadcasts on two topics:
| Topic | Purpose |
|---|---|
"agent_server:#{agent_id}" | Main events (status, messages, todos) |
"agent_server:debug:#{agent_id}" | Debug events (state snapshots, middleware actions) |
Subscribing to Events
Main Events
# In your LiveView or GenServer
def mount(%{"id" => id}, _session, socket) do
if connected?(socket) do
AgentServer.subscribe("conversation-#{id}")
end
{:ok, socket}
end
def handle_info({:agent, event}, socket) do
# Handle the event
{:noreply, handle_agent_event(event, socket)}
endDebug Events
# For development/debugging tools
AgentServer.subscribe_debug("conversation-123")
def handle_info({:agent, {:debug, debug_event}}, socket) do
case debug_event do
{:agent_state_update, state} ->
# Full state snapshot
{:noreply, assign(socket, debug_state: state)}
{:middleware_action, module, data} ->
# Middleware-specific event
{:noreply, log_middleware_action(socket, module, data)}
end
endEvent Reference
Status Events
{:agent, {:status_changed, status, data}}| Status | Data | Description |
|---|---|---|
:idle | nil | Agent ready, not executing |
:running | nil | Execution in progress |
:interrupted | %InterruptData{} | Waiting for HITL approval |
:cancelled | nil | User cancelled execution |
:error | reason | Execution failed |
Example handler:
def handle_agent_event({:status_changed, status, data}, socket) do
case {status, data} do
{:running, nil} ->
socket
|> assign(status: :running)
|> assign(streaming_content: "")
{:idle, nil} ->
assign(socket, status: :idle)
{:interrupted, interrupt_data} ->
socket
|> assign(status: :interrupted)
|> assign(interrupt: interrupt_data)
|> push_event("show_approval_modal", %{})
{:error, reason} ->
socket
|> assign(status: :error)
|> put_flash(:error, "Agent error: #{inspect(reason)}")
{:cancelled, nil} ->
assign(socket, status: :cancelled)
end
endLLM Message Events
# Streaming tokens (during generation)
{:agent, {:llm_deltas, [%MessageDelta{}, ...]}}
# Complete message (after generation)
{:agent, {:llm_message, %Message{}}}
# Token usage info
{:agent, {:llm_token_usage, %TokenUsage{}}}Example streaming handler:
def handle_agent_event({:llm_deltas, deltas}, socket) do
# Accumulate streaming content
new_content =
Enum.reduce(deltas, socket.assigns.streaming_content, fn delta, acc ->
acc <> (delta.content || "")
end)
socket
|> assign(streaming_content: new_content)
|> push_event("scroll_to_bottom", %{})
end
def handle_agent_event({:llm_message, message}, socket) do
# Complete message received - add to history
socket
|> update(:messages, &(&1 ++ [message]))
|> assign(streaming_content: "")
end
def handle_agent_event({:llm_token_usage, usage}, socket) do
# Update token counts for display
assign(socket, token_usage: usage)
endTODO Events
{:agent, {:todos_updated, [%Todo{}, ...]}}The TODO list is a complete snapshot (not a diff):
def handle_agent_event({:todos_updated, todos}, socket) do
# Replace entire TODO list
assign(socket, todos: todos)
endTool Events
# Tool call identified (LLM requested a tool)
{:agent, {:tool_call_identified, tool_info}}
# tool_info contains: %{call_id, name, display_text, arguments}
# Tool execution started
{:agent, {:tool_execution_started, tool_info}}
# tool_info contains: %{call_id, name, display_text, arguments}
# Tool execution completed successfully
{:agent, {:tool_execution_completed, call_id, tool_result}}
# call_id - matches the started event
# tool_result - %ToolResult{} struct with response
# Tool execution failed
{:agent, {:tool_execution_failed, call_id, error}}
# call_id - matches the started event
# error - error reason or messageShutdown Event
{:agent, {:agent_shutdown, %{reason: reason, metadata: map}}}Reasons:
:inactivity- Timeout expired:no_viewers- No presence after completion:manual- Explicitly stopped:crash- Process crashed (rare)
def handle_agent_event({:agent_shutdown, %{reason: reason}}, socket) do
case reason do
:inactivity ->
# Agent timed out - can restart on user action
socket
|> assign(agent_status: :inactive)
|> put_flash(:info, "Conversation paused due to inactivity")
:no_viewers ->
# Normal cleanup after completion
assign(socket, agent_status: :inactive)
_ ->
assign(socket, agent_status: :stopped)
end
endDisplay Message Events
# Message was persisted to database
{:agent, {:display_message_saved, display_message}}Useful for syncing UI with database state.
Debug Events
Debug events provide detailed insight into agent internals.
Note: While you can subscribe to debug events directly, the
sagents_live_debuggerpackage already uses these events to provide a powerful real-time debugging dashboard. It visualizes agent state, middleware actions, and sub-agent hierarchies out of the box. See the Live Debugger documentation for setup instructions.
State Updates
{:agent, {:debug, {:agent_state_update, %State{}}}}Full state snapshot after each execution step.
Middleware Actions
{:agent, {:debug, {:middleware_action, module, data}}}Each middleware can emit custom debug events:
| Middleware | Event | Data |
|---|---|---|
Summarization | :summarization_started | Token count info |
Summarization | :summarization_completed | Message count reduction |
PatchToolCalls | :dangling_tool_calls_patched | Patch count |
ConversationTitle | :title_generated | Generated title |
HumanInTheLoop | :interrupt_triggered | Tool calls requiring approval |
Example debug handler:
def handle_info({:agent, {:debug, event}}, socket) do
case event do
{:agent_state_update, state} ->
{:noreply, update(socket, :debug_log, &[{:state, state} | &1])}
{:middleware_action, Sagents.Middleware.Summarization, {:summarization_completed, info}} ->
{:noreply, update(socket, :debug_log, &[{:summarization, info} | &1])}
{:middleware_action, module, data} ->
{:noreply, update(socket, :debug_log, &[{module, data} | &1])}
end
endPublishing Custom Events
From Middleware
Middleware can publish events using the AgentServer API:
defmodule MyMiddleware do
@behaviour Sagents.Middleware
def after_model(state, _config) do
# Publish custom event
AgentServer.publish_event_from(
state.agent_id,
{:my_custom_event, %{data: "something"}}
)
# Publish debug event (separate topic)
AgentServer.publish_debug_event_from(
state.agent_id,
{:middleware_action, __MODULE__, {:custom_action, "details"}}
)
{:ok, state}
end
endEvent Structure
Events are always wrapped in {:agent, event}:
# What you publish
AgentServer.publish_event_from(agent_id, {:my_event, data})
# What subscribers receive
{:agent, {:my_event, data}}Phoenix.Presence Integration
Overview
Presence tracking enables:
- Knowing how many clients are viewing a conversation
- Automatic shutdown when all viewers leave
- Displaying who else is viewing
Setup
- Create a Presence module:
defmodule MyApp.Presence do
use Phoenix.Presence,
otp_app: :my_app,
pubsub_server: MyApp.PubSub
end- Start it in your application:
children = [
MyApp.Presence,
# ...
]- Configure AgentServer:
AgentServer.start_link(
agent: agent,
pubsub: {Phoenix.PubSub, MyApp.PubSub},
presence_tracking: [
enabled: true,
presence_module: MyApp.Presence,
topic: "conversation:#{conversation_id}",
grace_period: 5_000 # 5 seconds before shutdown
]
)Tracking Viewers
In your LiveView:
def mount(%{"id" => conversation_id}, _session, socket) do
topic = "conversation:#{conversation_id}"
if connected?(socket) do
# Subscribe to agent events
AgentServer.subscribe("conversation-#{conversation_id}")
# Track this viewer
{:ok, _} = MyApp.Presence.track(
self(),
topic,
socket.assigns.current_user.id,
%{
user_name: socket.assigns.current_user.name,
joined_at: DateTime.utc_now()
}
)
# Subscribe to presence changes
Phoenix.PubSub.subscribe(MyApp.PubSub, topic)
end
{:ok, assign(socket, topic: topic)}
end
# Handle presence changes
def handle_info(%Phoenix.Socket.Broadcast{event: "presence_diff", payload: diff}, socket) do
presence = MyApp.Presence.list(socket.assigns.topic)
{:noreply, assign(socket, viewers: presence)}
endDisplaying Viewers
def render(assigns) do
~H"""
<div class="viewers">
<%= for {user_id, %{metas: metas}} <- @viewers do %>
<span class="viewer" title={hd(metas).user_name}>
<%= String.first(hd(metas).user_name) %>
</span>
<% end %>
</div>
"""
endHow Presence Affects Shutdown
- Agent completes execution (status:
:idle) - AgentServer checks presence count
- If no viewers: start grace period timer
- During grace period: if viewer joins, cancel shutdown
- After grace period: save state and terminate
Execution completes → Check presence → No viewers → Grace period → Shutdown
↓
Has viewers → Stay runningMultiple Subscribers
Multiple LiveView processes can subscribe to the same agent:
# Tab 1
AgentServer.subscribe("conversation-123")
# Tab 2 (same or different user)
AgentServer.subscribe("conversation-123")
# Both receive all eventsThis enables:
- Multiple browser tabs with same conversation
- Shared conversations between users
- Admin dashboards monitoring user conversations
Event Ordering
Events are delivered in order per agent, but:
- No ordering guarantee across different agents
- Deltas may batch multiple tokens
- State updates are eventual (not transactional)
For consistency:
- Use
{:llm_message, msg}for final message content (not accumulated deltas) - Use
{:todos_updated, todos}snapshots (not diffs) - Handle
{:agent_shutdown, _}to detect agent going away
Best Practices
1. Always Check Connection
def mount(_params, _session, socket) do
if connected?(socket) do
# Only subscribe when actually connected
AgentServer.subscribe(agent_id)
end
{:ok, socket}
end2. Handle Agent Not Running
def mount(%{"id" => id}, _session, socket) do
case Coordinator.start_conversation_session(id) do
{:ok, session} ->
AgentServer.subscribe(session.agent_id)
{:ok, assign(socket, agent_id: session.agent_id)}
{:error, :not_found} ->
{:ok, redirect(socket, to: ~p"/conversations")}
end
end3. Explicit Unsubscription
Subscriptions are automatically cleaned up when the LiveView process terminates. However, you can explicitly unsubscribe when your application needs to disconnect for other reasons:
@impl true
def handle_event("close_conversation", _params, socket) do
AgentServer.unsubscribe(socket.assigns.agent_id)
# ... other logic ...
{:noreply, socket}
end