Nous.AgentServer (nous v0.13.3)

View Source

GenServer wrapper for Nous agents with PubSub integration.

This server:

  • Wraps a Nous agent (standard or ReAct)
  • Links to parent process (dies when parent dies)
  • Subscribes to PubSub for incoming messages
  • Publishes responses back via PubSub
  • Maintains conversation context for multi-turn conversations

Context-Based State

Uses Nous.Agent.Context to maintain conversation state:

  • Messages accumulate across turns
  • Tool calls are tracked
  • Usage is aggregated
  • Callbacks forward to PubSub

Usage with LiveView

defmodule MyAppWeb.ChatLive do
  use MyAppWeb, :live_view

  def mount(_params, _session, socket) do
    # Start agent linked to this LiveView
    {:ok, agent_pid} = AgentServer.start_link(
      session_id: socket.assigns.session_id,
      agent_config: %{
        model: "lmstudio:qwen3-vl-4b-thinking-mlx",
        instructions: "You are a helpful assistant",
        tools: []
      }
    )

    # Subscribe to responses
    Phoenix.PubSub.subscribe(MyApp.PubSub, "agent:#{socket.assigns.session_id}")

    {:ok, assign(socket, agent_pid: agent_pid, messages: [])}
  end

  def handle_event("send_message", %{"message" => msg}, socket) do
    AgentServer.send_message(socket.assigns.agent_pid, msg)
    {:noreply, socket}
  end

  # Receive streaming deltas
  def handle_info({:agent_delta, text}, socket) do
    # Append text to current response
    {:noreply, update(socket, :current_response, &(&1 <> text))}
  end

  # Receive complete response
  def handle_info({:agent_complete, result}, socket) do
    messages = socket.assigns.messages ++ [%{role: :assistant, content: result.output}]
    {:noreply, assign(socket, messages: messages, current_response: "")}
  end

  # Receive tool calls
  def handle_info({:tool_call, call}, socket) do
    # Show tool call in UI
    {:noreply, socket}
  end
end

Message Interruption

Calling send_message/2 while the agent is already processing a request automatically cancels the in-flight execution and starts a new one. The server uses an :atomics-based flag so the running task can detect cancellation without message-passing overhead. Both the interrupted user message and the new one are preserved in the conversation context, so no input is lost.

PubSub Events

Subscribers on the "agent:<session_id>" topic receive the following messages:

MessageDescription
{:agent_status, :thinking}A new run is about to start
{:agent_status, :started}The LLM provider acknowledged the call
{:agent_delta, text}A streaming text chunk
{:tool_call, call}A tool invocation is in progress
{:tool_result, result}A tool returned its result
{:agent_response, output}The final text output of the run
{:agent_complete, result}The full result struct (output + context + usage)
{:agent_error, message}An error occurred during execution
{:agent_cancelled, reason}The execution was cancelled

Lifecycle

Each server starts an inactivity timer (default 5 minutes, configurable via :inactivity_timeout). The timer resets on every send_message/2 call. When the timer fires, the server terminates with :normal.

If a :persistence backend is configured, the conversation context is automatically saved after each successful agent run and restored on start_link/1.

Summary

Functions

Cancel the current agent execution.

Returns a specification to start this module under a supervisor.

Clear conversation context and start fresh.

Get conversation context.

Get conversation history (messages only).

Load a previously saved context from the persistence backend.

Manually save the current context to the persistence backend.

Send a message to the agent.

Start an AgentServer linked to the calling process.

Types

agent_config()

@type agent_config() :: %{
  model: String.t(),
  instructions: String.t(),
  tools: list(),
  type: :standard | :react,
  model_settings: map()
}

state()

@type state() :: %{
  session_id: String.t(),
  agent: Nous.Agent.t(),
  context: Nous.Agent.Context.t(),
  pubsub: module() | nil,
  topic: String.t(),
  agent_type: :standard | :react,
  current_task: Task.t() | nil,
  cancelled_ref: :atomics.atomics_ref()
}

Functions

cancel_execution(pid)

@spec cancel_execution(pid()) :: {:ok, :cancelled} | {:ok, :no_execution}

Cancel the current agent execution.

Returns {:ok, :cancelled} when an execution was running and has been stopped, or {:ok, :no_execution} when there was nothing to cancel.

The server will:

  • Set the atomics cancellation flag so the task exits at the next check
  • Shut down the running task gracefully (5 s timeout)
  • Broadcast {:agent_cancelled, reason} to PubSub subscribers
  • Reset the flag for future executions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

clear_history(pid)

@spec clear_history(pid()) :: :ok

Clear conversation context and start fresh.

Resets messages and tool-call history but preserves the configured dependencies (:deps) and system prompt.

get_context(pid)

@spec get_context(pid()) :: Nous.Agent.Context.t()

Get conversation context.

get_history(pid)

@spec get_history(pid()) :: list()

Get conversation history (messages only).

load_context(pid, session_id)

@spec load_context(pid(), String.t()) :: :ok | {:error, term()}

Load a previously saved context from the persistence backend.

Replaces the current context with the loaded one. Patches any dangling tool calls that may have been interrupted mid-execution.

Returns :ok on success, {:error, :no_persistence} if no backend is configured, or {:error, reason} on failure.

save_context(pid)

@spec save_context(pid()) :: :ok | {:error, term()}

Manually save the current context to the persistence backend.

Returns :ok on success, {:error, :no_persistence} if no backend is configured, or {:error, reason} on failure.

send_message(pid, message)

@spec send_message(pid(), String.t()) :: :ok

Send a message to the agent.

If a previous execution is still running, it is automatically cancelled before the new message is processed. The interrupted message and the new one both remain in the conversation context. Returns immediately — the agent run happens asynchronously and results are broadcast via PubSub.

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start an AgentServer linked to the calling process.

Options

  • :session_id - Unique session identifier (required)
  • :agent_config - Agent configuration map (required)
  • :pubsub - PubSub module (default: MyApp.PubSub)
  • :name - Optional GenServer name (e.g., a Registry via tuple)
  • :inactivity_timeout - Inactivity timeout in ms (default: 5 minutes). Set to :infinity to disable.
  • :persistence - Persistence backend module (e.g., Nous.Persistence.ETS). When set, context is auto-saved after each response and restored on init.

Agent Config

  • :model - Model string (e.g., "openai:gpt-4")
  • :instructions - System instructions
  • :tools - List of tool functions
  • :type - :standard or :react (default: :standard)
  • :model_settings - Model settings map
  • :deps - Initial dependencies for tools