View Source Jido.Agent.Server (Jido v1.0.0)
This module is under active development and may undergo changes.
A fault-tolerant, distributed agent server implementation that manages stateful agents with built-in pub/sub capabilities and dynamic supervision.
The Jido.Agent.Server
provides a robust framework for managing long-running agent processes
with the following key features:
- Automatic pub/sub event distribution via configurable PubSub adapter
- Dynamic supervision of child processes
- Configurable queue size limits and backpressure handling
- Graceful shutdown and cleanup of resources
- Comprehensive state management and transition handling
- Registry-based process naming for distributed operations
Usage
To start an agent server:
{:ok, pid} = Jido.Agent.Server.start_link([
agent: MyAgent.new(),
pubsub: MyApp.PubSub,
name: "my_agent",
max_queue_size: 5_000
])
The server can then receive commands via the cmd/4
function:
Jido.Agent.Server.cmd(pid, :some_action, %{arg: "value"}, [])
Configuration Options
:agent
- Required. The agent struct or module implementingnew/0
:pubsub
- Required. The PubSub module for event distribution:name
- Optional. The registration name (defaults to agent.id):topic
- Optional. The PubSub topic (defaults to generated from agent.id):max_queue_size
- Optional. Maximum pending signals (defaults to 10,000):registry
- Optional. The Registry for process registration (defaults to Jido.AgentRegistry)
Supervision
The server automatically supervises child processes using a :one_for_one
DynamicSupervisor
strategy. Each child process is monitored and can be restarted independently.
State Management
The server maintains its state through the Jido.Agent.Server.State
struct and handles
transitions via signal processing. State transitions are validated and published as events.
Signal Processing
Commands and events are processed as signals through the Execute
module, which handles:
- Action signals (synchronous commands)
- Event signals (asynchronous notifications)
- State transitions
- Queue management
Fault Tolerance
The server implements comprehensive error handling and cleanup:
- Graceful termination with resource cleanup
- Automatic unsubscription from PubSub topics
- Child process shutdown management
- Queue size monitoring and backpressure
See cmd/4
for sending commands and get_state/1
for retrieving current server state.
Summary
Functions
Sends a command to the agent server for processing.
Formats the server state for debugging.
Gets the agent ID from the server state.
Gets the complete server state.
Gets the current status from the server state.
Gets the child supervisor PID from the server state.
Gets the PubSub topic from the server state.
Handles synchronous calls to the server.
Handles asynchronous casts to the server.
Handles messages sent to the server process.
Initializes the server state with the provided configuration.
Starts a new Agent Server process linked to the current process.
Handles cleanup when the server is terminating.
Types
Functions
@spec cmd(GenServer.server(), module(), map(), keyword()) :: {:ok, Jido.Agent.Server.State.t()} | {:error, term()}
Sends a command to the agent server for processing.
Parameters
server
- The server process identifieraction
- The action module to executeargs
- Optional map of arguments for the action (default: %{})opts
- Optional keyword list of options (default: [])
Returns
{:ok, state}
- Command processed successfully with updated state{:error, reason}
- Command failed with reason
Formats the server state for debugging.
Returns a map containing:
- Current state
- Server status
- Agent ID
- Queue size
- Child process information
@spec get_id(GenServer.server()) :: {:ok, String.t()} | {:error, term()}
Gets the agent ID from the server state.
Parameters
server
- The server process identifier
Returns
{:ok, id}
- The agent ID string{:error, reason}
- Failed to get ID
@spec get_state(GenServer.server()) :: {:ok, Jido.Agent.Server.State.t()} | {:error, term()}
Gets the complete server state.
Parameters
server
- The server process identifier
Returns
{:ok, state}
- The complete server state{:error, reason}
- Failed to get state
@spec get_status(GenServer.server()) :: {:ok, atom()} | {:error, term()}
Gets the current status from the server state.
Parameters
server
- The server process identifier
Returns
{:ok, status}
- The current server status{:error, reason}
- Failed to get status
@spec get_supervisor(GenServer.server()) :: {:ok, pid()} | {:error, term()}
Gets the child supervisor PID from the server state.
Parameters
server
- The server process identifier
Returns
{:ok, pid}
- The supervisor PID{:error, reason}
- Failed to get supervisor
@spec get_topic(GenServer.server()) :: {:ok, String.t()} | {:error, term()}
Gets the PubSub topic from the server state.
Parameters
server
- The server process identifier
Returns
{:ok, topic}
- The PubSub topic string{:error, reason}
- Failed to get topic
Handles synchronous calls to the server.
Call Patterns
:get_state
- Returns the complete server state%Signal{}
- Processes a signal synchronously if queue not full- Other - Returns unhandled call error
Returns
{:reply, reply, new_state}
- Response and updated state{:reply, {:error, reason}, state}
- Error response
Handles asynchronous casts to the server.
Cast Patterns
%Signal{}
- Processes a signal asynchronously- Other - Logs unhandled cast
Returns
{:noreply, new_state}
- Updated state after processing{:stop, reason, state}
- Stops server on error
Handles messages sent to the server process.
Message Patterns
%Signal{}
- Processes non-event signals:check_queue_size
- Monitors queue size and hibernates if needed{:DOWN, ...}
- Handles child process termination:timeout
- Handles timeout messages- Other - Logs unhandled messages
Returns
{:noreply, new_state}
- Updated state after processing{:stop, reason, state}
- Stops server on error
Initializes the server state with the provided configuration.
Parameters
%{agent:, pubsub:, topic:, max_queue_size:}
- The initialization map containing:agent
- The agent struct to managepubsub
- The PubSub module for event distributiontopic
- Optional topic override (defaults to agent ID based topic)max_queue_size
- Maximum pending signals allowed
Returns
{:ok, state}
- Successfully initialized with the server state{:stop, reason}
- Failed to initialize with error reason
@spec start_link([start_opt()]) :: GenServer.on_start()
Starts a new Agent Server process linked to the current process.
Initializes the server with the given agent and configuration, subscribes to relevant PubSub topics, and starts a DynamicSupervisor for managing child processes.
Parameters
opts
- Keyword list of server options::agent
- Required. Agent struct or module implementingnew/0
:pubsub
- Required. PubSub module for event distribution:name
- Optional. Registration name (defaults to agent.id):topic
- Optional. PubSub topic (defaults to generated from agent.id):max_queue_size
- Optional. Maximum pending signals (defaults to 10,000):registry
- Optional. Registry for process registration (defaults to Jido.AgentRegistry)
Returns
{:ok, pid}
- Successfully started server process{:error, reason}
- Failed to start server
Error Reasons
:invalid_agent
- Agent is nil or invalid:missing_pubsub
- PubSub module not provided:already_started
- Server already registered with given name- Any error from PubSub subscription or DynamicSupervisor start
Examples
# Start with minimal configuration
{:ok, pid} = Jido.Agent.Server.start_link([
agent: MyAgent.new(),
pubsub: MyApp.PubSub
])
# Start with full configuration
{:ok, pid} = Jido.Agent.Server.start_link([
agent: MyAgent.new(),
pubsub: MyApp.PubSub,
name: "custom_agent",
topic: "agents:custom",
max_queue_size: 5_000,
registry: MyApp.Registry
])
Runtime Behavior
The server performs these steps during initialization:
- Validates agent and builds configuration
- Starts DynamicSupervisor for child processes
- Subscribes to configured PubSub topic
- Transitions to :idle state
- Emits 'started' event
The process is registered via the configured Registry using the :via tuple pattern for distributed process lookup.
Handles cleanup when the server is terminating.
Performs the following cleanup:
- Emits stopped event
- Stops child supervisor
- Unsubscribes from topics
Parameters
reason
- The reason for terminationstate
- The current server state
Returns
:ok
- Cleanup completed (even if some steps failed)