View Source Jido.Agent.Server (Jido v1.0.0)

This module is under active development and may undergo changes.

A fault-tolerant, distributed agent server implementation that manages stateful agents with built-in pub/sub capabilities and dynamic supervision.

The Jido.Agent.Server provides a robust framework for managing long-running agent processes with the following key features:

  • Automatic pub/sub event distribution via configurable PubSub adapter
  • Dynamic supervision of child processes
  • Configurable queue size limits and backpressure handling
  • Graceful shutdown and cleanup of resources
  • Comprehensive state management and transition handling
  • Registry-based process naming for distributed operations

Usage

To start an agent server:

{:ok, pid} = Jido.Agent.Server.start_link([
  agent: MyAgent.new(),
  pubsub: MyApp.PubSub,
  name: "my_agent",
  max_queue_size: 5_000
])

The server can then receive commands via the cmd/4 function:

Jido.Agent.Server.cmd(pid, :some_action, %{arg: "value"}, [])

Configuration Options

  • :agent - Required. The agent struct or module implementing new/0
  • :pubsub - Required. The PubSub module for event distribution
  • :name - Optional. The registration name (defaults to agent.id)
  • :topic - Optional. The PubSub topic (defaults to generated from agent.id)
  • :max_queue_size - Optional. Maximum pending signals (defaults to 10,000)
  • :registry - Optional. The Registry for process registration (defaults to Jido.AgentRegistry)

Supervision

The server automatically supervises child processes using a :one_for_one DynamicSupervisor strategy. Each child process is monitored and can be restarted independently.

State Management

The server maintains its state through the Jido.Agent.Server.State struct and handles transitions via signal processing. State transitions are validated and published as events.

Signal Processing

Commands and events are processed as signals through the Execute module, which handles:

  • Action signals (synchronous commands)
  • Event signals (asynchronous notifications)
  • State transitions
  • Queue management

Fault Tolerance

The server implements comprehensive error handling and cleanup:

  • Graceful termination with resource cleanup
  • Automatic unsubscription from PubSub topics
  • Child process shutdown management
  • Queue size monitoring and backpressure

See cmd/4 for sending commands and get_state/1 for retrieving current server state.

Summary

Functions

Sends a command to the agent server for processing.

Formats the server state for debugging.

Gets the agent ID from the server state.

Gets the complete server state.

Gets the current status from the server state.

Gets the child supervisor PID from the server state.

Gets the PubSub topic from the server state.

Handles synchronous calls to the server.

Handles asynchronous casts to the server.

Handles messages sent to the server process.

Initializes the server state with the provided configuration.

Starts a new Agent Server process linked to the current process.

Handles cleanup when the server is terminating.

Types

start_opt()

@type start_opt() ::
  {:agent, struct() | module()}
  | {:pubsub, module()}
  | {:name, String.t() | atom()}
  | {:topic, String.t()}
  | {:max_queue_size, pos_integer()}
  | {:registry, module()}

Functions

cmd(server, action, args \\ %{}, opts \\ [])

@spec cmd(GenServer.server(), module(), map(), keyword()) ::
  {:ok, Jido.Agent.Server.State.t()} | {:error, term()}

Sends a command to the agent server for processing.

Parameters

  • server - The server process identifier
  • action - The action module to execute
  • args - Optional map of arguments for the action (default: %{})
  • opts - Optional keyword list of options (default: [])

Returns

  • {:ok, state} - Command processed successfully with updated state
  • {:error, reason} - Command failed with reason

dbug(_, _ \\ [], _ \\ [])

(macro)

error(_, _ \\ [], _ \\ [])

(macro)

format_status(reason, list)

Formats the server state for debugging.

Returns a map containing:

  • Current state
  • Server status
  • Agent ID
  • Queue size
  • Child process information

get_id(server)

@spec get_id(GenServer.server()) :: {:ok, String.t()} | {:error, term()}

Gets the agent ID from the server state.

Parameters

  • server - The server process identifier

Returns

  • {:ok, id} - The agent ID string
  • {:error, reason} - Failed to get ID

get_state(server)

@spec get_state(GenServer.server()) ::
  {:ok, Jido.Agent.Server.State.t()} | {:error, term()}

Gets the complete server state.

Parameters

  • server - The server process identifier

Returns

  • {:ok, state} - The complete server state
  • {:error, reason} - Failed to get state

get_status(server)

@spec get_status(GenServer.server()) :: {:ok, atom()} | {:error, term()}

Gets the current status from the server state.

Parameters

  • server - The server process identifier

Returns

  • {:ok, status} - The current server status
  • {:error, reason} - Failed to get status

get_supervisor(server)

@spec get_supervisor(GenServer.server()) :: {:ok, pid()} | {:error, term()}

Gets the child supervisor PID from the server state.

Parameters

  • server - The server process identifier

Returns

  • {:ok, pid} - The supervisor PID
  • {:error, reason} - Failed to get supervisor

get_topic(server)

@spec get_topic(GenServer.server()) :: {:ok, String.t()} | {:error, term()}

Gets the PubSub topic from the server state.

Parameters

  • server - The server process identifier

Returns

  • {:ok, topic} - The PubSub topic string
  • {:error, reason} - Failed to get topic

handle_call(msg, from, state)

Handles synchronous calls to the server.

Call Patterns

  • :get_state - Returns the complete server state
  • %Signal{} - Processes a signal synchronously if queue not full
  • Other - Returns unhandled call error

Returns

  • {:reply, reply, new_state} - Response and updated state
  • {:reply, {:error, reason}, state} - Error response

handle_cast(msg, state)

Handles asynchronous casts to the server.

Cast Patterns

  • %Signal{} - Processes a signal asynchronously
  • Other - Logs unhandled cast

Returns

  • {:noreply, new_state} - Updated state after processing
  • {:stop, reason, state} - Stops server on error

handle_info(msg, state)

Handles messages sent to the server process.

Message Patterns

  • %Signal{} - Processes non-event signals
  • :check_queue_size - Monitors queue size and hibernates if needed
  • {:DOWN, ...} - Handles child process termination
  • :timeout - Handles timeout messages
  • Other - Logs unhandled messages

Returns

  • {:noreply, new_state} - Updated state after processing
  • {:stop, reason, state} - Stops server on error

init(map)

Initializes the server state with the provided configuration.

Parameters

  • %{agent:, pubsub:, topic:, max_queue_size:} - The initialization map containing:
    • agent - The agent struct to manage
    • pubsub - The PubSub module for event distribution
    • topic - Optional topic override (defaults to agent ID based topic)
    • max_queue_size - Maximum pending signals allowed

Returns

  • {:ok, state} - Successfully initialized with the server state
  • {:stop, reason} - Failed to initialize with error reason

start_link(opts)

@spec start_link([start_opt()]) :: GenServer.on_start()

Starts a new Agent Server process linked to the current process.

Initializes the server with the given agent and configuration, subscribes to relevant PubSub topics, and starts a DynamicSupervisor for managing child processes.

Parameters

  • opts - Keyword list of server options:
    • :agent - Required. Agent struct or module implementing new/0
    • :pubsub - Required. PubSub module for event distribution
    • :name - Optional. Registration name (defaults to agent.id)
    • :topic - Optional. PubSub topic (defaults to generated from agent.id)
    • :max_queue_size - Optional. Maximum pending signals (defaults to 10,000)
    • :registry - Optional. Registry for process registration (defaults to Jido.AgentRegistry)

Returns

  • {:ok, pid} - Successfully started server process
  • {:error, reason} - Failed to start server

Error Reasons

  • :invalid_agent - Agent is nil or invalid
  • :missing_pubsub - PubSub module not provided
  • :already_started - Server already registered with given name
  • Any error from PubSub subscription or DynamicSupervisor start

Examples

# Start with minimal configuration
{:ok, pid} = Jido.Agent.Server.start_link([
  agent: MyAgent.new(),
  pubsub: MyApp.PubSub
])

# Start with full configuration
{:ok, pid} = Jido.Agent.Server.start_link([
  agent: MyAgent.new(),
  pubsub: MyApp.PubSub,
  name: "custom_agent",
  topic: "agents:custom",
  max_queue_size: 5_000,
  registry: MyApp.Registry
])

Runtime Behavior

The server performs these steps during initialization:

  1. Validates agent and builds configuration
  2. Starts DynamicSupervisor for child processes
  3. Subscribes to configured PubSub topic
  4. Transitions to :idle state
  5. Emits 'started' event

The process is registered via the configured Registry using the :via tuple pattern for distributed process lookup.

terminate(reason, state)

Handles cleanup when the server is terminating.

Performs the following cleanup:

  • Emits stopped event
  • Stops child supervisor
  • Unsubscribes from topics

Parameters

  • reason - The reason for termination
  • state - The current server state

Returns

  • :ok - Cleanup completed (even if some steps failed)