PubSub & Presence

Copy Markdown

This document covers real-time event streaming and viewer tracking in Sagents.

Overview

Sagents uses Phoenix.PubSub for real-time event broadcasting, enabling:

  • Multiple LiveView clients watching the same conversation
  • Real-time streaming of LLM responses
  • TODO list and state synchronization
  • Debug event streams for development tools

PubSub Configuration

Setup

  1. Ensure Phoenix.PubSub is started in your application:
# application.ex
children = [
  {Phoenix.PubSub, name: MyApp.PubSub},
  # ... other children
]
  1. Pass PubSub to AgentServer:
AgentServer.start_link(
  agent: agent,
  pubsub: {Phoenix.PubSub, MyApp.PubSub}
)

Topics

Each agent broadcasts on two topics:

TopicPurpose
"agent_server:#{agent_id}"Main events (status, messages, todos)
"agent_server:debug:#{agent_id}"Debug events (state snapshots, middleware actions)

Subscribing to Events

Main Events

# In your LiveView or GenServer
def mount(%{"id" => id}, _session, socket) do
  if connected?(socket) do
    AgentServer.subscribe("conversation-#{id}")
  end
  {:ok, socket}
end

def handle_info({:agent, event}, socket) do
  # Handle the event
  {:noreply, handle_agent_event(event, socket)}
end

Debug Events

# For development/debugging tools
AgentServer.subscribe_debug("conversation-123")

def handle_info({:agent, {:debug, debug_event}}, socket) do
  case debug_event do
    {:agent_state_update, state} ->
      # Full state snapshot
      {:noreply, assign(socket, debug_state: state)}

    {:middleware_action, module, data} ->
      # Middleware-specific event
      {:noreply, log_middleware_action(socket, module, data)}
  end
end

Event Reference

Status Events

{:agent, {:status_changed, status, data}}
StatusDataDescription
:idlenilAgent ready, not executing
:runningnilExecution in progress
:interrupted%InterruptData{}Waiting for HITL approval
:cancellednilUser cancelled execution
:errorreasonExecution failed

Example handler:

def handle_agent_event({:status_changed, status, data}, socket) do
  case {status, data} do
    {:running, nil} ->
      socket
      |> assign(status: :running)
      |> assign(streaming_content: "")

    {:idle, nil} ->
      assign(socket, status: :idle)

    {:interrupted, interrupt_data} ->
      socket
      |> assign(status: :interrupted)
      |> assign(interrupt: interrupt_data)
      |> push_event("show_approval_modal", %{})

    {:error, reason} ->
      socket
      |> assign(status: :error)
      |> put_flash(:error, "Agent error: #{inspect(reason)}")

    {:cancelled, nil} ->
      assign(socket, status: :cancelled)
  end
end

LLM Message Events

# Streaming tokens (during generation)
{:agent, {:llm_deltas, [%MessageDelta{}, ...]}}

# Complete message (after generation)
{:agent, {:llm_message, %Message{}}}

# Token usage info
{:agent, {:llm_token_usage, %TokenUsage{}}}

Example streaming handler:

def handle_agent_event({:llm_deltas, deltas}, socket) do
  # Accumulate streaming content
  new_content =
    Enum.reduce(deltas, socket.assigns.streaming_content, fn delta, acc ->
      acc <> (delta.content || "")
    end)

  socket
  |> assign(streaming_content: new_content)
  |> push_event("scroll_to_bottom", %{})
end

def handle_agent_event({:llm_message, message}, socket) do
  # Complete message received - add to history
  socket
  |> update(:messages, &(&1 ++ [message]))
  |> assign(streaming_content: "")
end

def handle_agent_event({:llm_token_usage, usage}, socket) do
  # Update token counts for display
  assign(socket, token_usage: usage)
end

TODO Events

{:agent, {:todos_updated, [%Todo{}, ...]}}

The TODO list is a complete snapshot (not a diff):

def handle_agent_event({:todos_updated, todos}, socket) do
  # Replace entire TODO list
  assign(socket, todos: todos)
end

Tool Events

# Tool call identified (LLM requested a tool)
{:agent, {:tool_call_identified, tool_info}}
# tool_info contains: %{call_id, name, display_text, arguments}

# Tool execution started
{:agent, {:tool_execution_started, tool_info}}
# tool_info contains: %{call_id, name, display_text, arguments}

# Tool execution completed successfully
{:agent, {:tool_execution_completed, call_id, tool_result}}
# call_id - matches the started event
# tool_result - %ToolResult{} struct with response

# Tool execution failed
{:agent, {:tool_execution_failed, call_id, error}}
# call_id - matches the started event
# error - error reason or message

Shutdown Event

{:agent, {:agent_shutdown, %{reason: reason, metadata: map}}}

Reasons:

  • :inactivity - Timeout expired
  • :no_viewers - No presence after completion
  • :manual - Explicitly stopped
  • :crash - Process crashed (rare)
def handle_agent_event({:agent_shutdown, %{reason: reason}}, socket) do
  case reason do
    :inactivity ->
      # Agent timed out - can restart on user action
      socket
      |> assign(agent_status: :inactive)
      |> put_flash(:info, "Conversation paused due to inactivity")

    :no_viewers ->
      # Normal cleanup after completion
      assign(socket, agent_status: :inactive)

    _ ->
      assign(socket, agent_status: :stopped)
  end
end

Display Message Events

# Message was persisted to database
{:agent, {:display_message_saved, display_message}}

Useful for syncing UI with database state.

Debug Events

Debug events provide detailed insight into agent internals.

Note: While you can subscribe to debug events directly, the sagents_live_debugger package already uses these events to provide a powerful real-time debugging dashboard. It visualizes agent state, middleware actions, and sub-agent hierarchies out of the box. See the Live Debugger documentation for setup instructions.

State Updates

{:agent, {:debug, {:agent_state_update, %State{}}}}

Full state snapshot after each execution step.

Middleware Actions

{:agent, {:debug, {:middleware_action, module, data}}}

Each middleware can emit custom debug events:

MiddlewareEventData
Summarization:summarization_startedToken count info
Summarization:summarization_completedMessage count reduction
PatchToolCalls:dangling_tool_calls_patchedPatch count
ConversationTitle:title_generatedGenerated title
HumanInTheLoop:interrupt_triggeredTool calls requiring approval

Example debug handler:

def handle_info({:agent, {:debug, event}}, socket) do
  case event do
    {:agent_state_update, state} ->
      {:noreply, update(socket, :debug_log, &[{:state, state} | &1])}

    {:middleware_action, Sagents.Middleware.Summarization, {:summarization_completed, info}} ->
      {:noreply, update(socket, :debug_log, &[{:summarization, info} | &1])}

    {:middleware_action, module, data} ->
      {:noreply, update(socket, :debug_log, &[{module, data} | &1])}
  end
end

Publishing Custom Events

From Middleware

Middleware can publish events using the AgentServer API:

defmodule MyMiddleware do
  @behaviour Sagents.Middleware

  def after_model(state, _config) do
    # Publish custom event
    AgentServer.publish_event_from(
      state.agent_id,
      {:my_custom_event, %{data: "something"}}
    )

    # Publish debug event (separate topic)
    AgentServer.publish_debug_event_from(
      state.agent_id,
      {:middleware_action, __MODULE__, {:custom_action, "details"}}
    )

    {:ok, state}
  end
end

Event Structure

Events are always wrapped in {:agent, event}:

# What you publish
AgentServer.publish_event_from(agent_id, {:my_event, data})

# What subscribers receive
{:agent, {:my_event, data}}

Phoenix.Presence Integration

Overview

Presence tracking enables:

  • Knowing how many clients are viewing a conversation
  • Automatic shutdown when all viewers leave
  • Displaying who else is viewing

Setup

  1. Create a Presence module:
defmodule MyApp.Presence do
  use Phoenix.Presence,
    otp_app: :my_app,
    pubsub_server: MyApp.PubSub
end
  1. Start it in your application:
children = [
  MyApp.Presence,
  # ...
]
  1. Configure AgentServer:
AgentServer.start_link(
  agent: agent,
  pubsub: {Phoenix.PubSub, MyApp.PubSub},
  presence_tracking: [
    enabled: true,
    presence_module: MyApp.Presence,
    topic: "conversation:#{conversation_id}",
    grace_period: 5_000  # 5 seconds before shutdown
  ]
)

Tracking Viewers

In your LiveView:

def mount(%{"id" => conversation_id}, _session, socket) do
  topic = "conversation:#{conversation_id}"

  if connected?(socket) do
    # Subscribe to agent events
    AgentServer.subscribe("conversation-#{conversation_id}")

    # Track this viewer
    {:ok, _} = MyApp.Presence.track(
      self(),
      topic,
      socket.assigns.current_user.id,
      %{
        user_name: socket.assigns.current_user.name,
        joined_at: DateTime.utc_now()
      }
    )

    # Subscribe to presence changes
    Phoenix.PubSub.subscribe(MyApp.PubSub, topic)
  end

  {:ok, assign(socket, topic: topic)}
end

# Handle presence changes
def handle_info(%Phoenix.Socket.Broadcast{event: "presence_diff", payload: diff}, socket) do
  presence = MyApp.Presence.list(socket.assigns.topic)
  {:noreply, assign(socket, viewers: presence)}
end

Displaying Viewers

def render(assigns) do
  ~H"""
  <div class="viewers">
    <%= for {user_id, %{metas: metas}} <- @viewers do %>
      <span class="viewer" title={hd(metas).user_name}>
        <%= String.first(hd(metas).user_name) %>
      </span>
    <% end %>
  </div>
  """
end

How Presence Affects Shutdown

  1. Agent completes execution (status: :idle)
  2. AgentServer checks presence count
  3. If no viewers: start grace period timer
  4. During grace period: if viewer joins, cancel shutdown
  5. After grace period: save state and terminate
Execution completes  Check presence  No viewers  Grace period  Shutdown
                           
                      Has viewers  Stay running

Multiple Subscribers

Multiple LiveView processes can subscribe to the same agent:

# Tab 1
AgentServer.subscribe("conversation-123")

# Tab 2 (same or different user)
AgentServer.subscribe("conversation-123")

# Both receive all events

This enables:

  • Multiple browser tabs with same conversation
  • Shared conversations between users
  • Admin dashboards monitoring user conversations

Event Ordering

Events are delivered in order per agent, but:

  • No ordering guarantee across different agents
  • Deltas may batch multiple tokens
  • State updates are eventual (not transactional)

For consistency:

  • Use {:llm_message, msg} for final message content (not accumulated deltas)
  • Use {:todos_updated, todos} snapshots (not diffs)
  • Handle {:agent_shutdown, _} to detect agent going away

Best Practices

1. Always Check Connection

def mount(_params, _session, socket) do
  if connected?(socket) do
    # Only subscribe when actually connected
    AgentServer.subscribe(agent_id)
  end
  {:ok, socket}
end

2. Handle Agent Not Running

def mount(%{"id" => id}, _session, socket) do
  case Coordinator.start_conversation_session(id) do
    {:ok, session} ->
      AgentServer.subscribe(session.agent_id)
      {:ok, assign(socket, agent_id: session.agent_id)}

    {:error, :not_found} ->
      {:ok, redirect(socket, to: ~p"/conversations")}
  end
end

3. Explicit Unsubscription

Subscriptions are automatically cleaned up when the LiveView process terminates. However, you can explicitly unsubscribe when your application needs to disconnect for other reasons:

@impl true
def handle_event("close_conversation", _params, socket) do
  AgentServer.unsubscribe(socket.assigns.agent_id)
  # ... other logic ...
  {:noreply, socket}
end