View Source Jido.Agent.Runtime (Jido v1.0.0-rc.4)

A GenServer implementation for managing agent state and operations in a distributed system.

The Runtime module provides a robust framework for managing agent lifecycle, state transitions, and command processing. It handles both synchronous and asynchronous operations while maintaining fault tolerance and providing comprehensive logging and telemetry.

Features

  • State machine-based lifecycle management
  • Asynchronous command processing with queuing
  • PubSub-based event broadcasting
  • Comprehensive error handling and recovery
  • Telemetry integration for monitoring
  • Distributed registration via Registry

States

Runtimes follow a state machine pattern with these states:

  • :initializing - Initial startup state
  • :idle - Ready to accept commands
  • :running - Actively processing commands
  • :paused - Suspended command processing (queues new commands)
  • :planning - Planning but not executing actions

Command Types

The worker processes two main types of commands:

  1. Act commands: Asynchronous actions that modify agent state
  2. Management commands: Synchronous operations that control worker behavior

Usage

Start a worker under a supervisor:

children = [
  {Jido.Agent.Runtime,
    agent: MyAgent.new(),
    pubsub: MyApp.PubSub,
    topic: "custom.topic",
    max_queue_size: 1000  # Optional, defaults to 10000
  }
]
Supervisor.start_link(children, strategy: :one_for_one)

Send commands to the worker:

# Asynchronous action
Runtime.act(worker_pid, %{command: :move, destination: :kitchen})

# Synchronous management
{:ok, new_state} = Runtime.manage(worker_pid, :pause)

Events

The worker emits these events on its PubSub topic:

  • jido.agent.started - Runtime initialization complete
  • jido.agent.state_changed - Runtime state transitions
  • jido.agent.cmd_completed - Action execution completed
  • jido.agent.queue_overflow - Queue size exceeded max_queue_size

Error Handling

The worker implements several error handling mechanisms:

  • Command validation and queueing
  • State transition validation
  • Automatic command retries (configurable)
  • Dead letter handling for failed commands
  • Queue size limits with overflow protection

Summary

Types

Management commands that can be sent to the worker.

Functions

Returns a child specification for starting the worker under a supervisor.

Sends a synchronous action command to the worker.

Sends an asynchronous command to the worker.

Gets the topic for an agent runtime process.

Lists all child processes running under this runtime's supervisor.

Sends a synchronous management command to the worker.

Starts a worker process linked to the current process.

Starts a new child process under the runtime's DynamicSupervisor.

Terminates a child process running under this runtime's supervisor.

Types

command()

@type command() :: :replan | :pause | :resume | :reset | :stop

Management commands that can be sent to the worker.

  • :replan - Trigger replanning of current actions
  • :pause - Suspend command processing
  • :resume - Resume command processing
  • :reset - Reset to initial state
  • :stop - Gracefully stop the worker

Functions

child_spec(init_arg)

@spec child_spec(keyword()) :: Supervisor.child_spec()

Returns a child specification for starting the worker under a supervisor.

Options

Accepts same options as start_link/1 plus:

  • :id - Optional supervisor child id (defaults to module name)

Examples

children = [
  {Runtime, agent: agent, pubsub: pubsub, id: :worker_1}
]
Supervisor.start_link(children, strategy: :one_for_one)

cmd(server, command \\ :default, args \\ %{}, opts \\ [])

@spec cmd(GenServer.server(), atom(), map(), keyword()) ::
  {:ok, Jido.Agent.Runtime.State.t()} | {:error, term()}

Sends a synchronous action command to the worker.

This function blocks until the action is completed and returns the result.

Parameters

  • server - Runtime pid or name
  • command - Action command as atom
  • args - Map of additional arguments for the command
  • opts - Optional keyword list of options:
    • :apply_state - Whether to apply results to agent state (default: true)

Returns

  • {:ok, new_state} - Action completed successfully with new state
  • {:error, reason} - Action failed

Examples

iex> Runtime.cmd(worker, :move, %{destination: :kitchen})
{:ok, %State{}}

iex> Runtime.cmd(worker, :recharge, %{}, apply_state: false)
{:ok, %State{}}

cmd_async(server, command \\ :default, args \\ %{}, opts \\ [])

@spec cmd_async(GenServer.server(), atom(), map(), keyword()) ::
  :ok | {:error, term()}

Sends an asynchronous command to the worker.

The command is processed based on the worker's current state:

  • If :running or :idle - Executed immediately
  • If :paused - Queued for later execution
  • Otherwise - Returns error

Parameters

  • server - Runtime pid or name
  • command - Action command as atom
  • args - Map of additional arguments for the command
  • opts - Optional keyword list of options:
    • :apply_state - Whether to apply results to agent state (default: true)

Returns

  • :ok - Command accepted for processing
  • {:error, reason} - Command rejected

Examples

iex> Runtime.cmd_async(worker, :move, %{destination: :kitchen})
:ok

iex> Runtime.cmd_async(worker, :recharge, %{}, apply_state: false)
:ok

get_topic(server)

@spec get_topic(GenServer.server()) :: {:ok, String.t()} | {:error, term()}

Gets the topic for an agent runtime process.

Parameters

  • server - Runtime pid or name

Returns

  • {:ok, topic} - The topic string for the agent
  • {:error, reason} - Failed to get topic

Examples

iex> Runtime.get_topic(worker)
{:ok, "jido.agent.my_agent_1"}

list_processes(server)

@spec list_processes(GenServer.server()) ::
  {:ok, [{pid(), DynamicSupervisor.child_spec()}]} | {:error, term()}

Lists all child processes running under this runtime's supervisor.

Parameters

  • server - Runtime pid or name

Returns

  • {:ok, [{pid(), child_spec()}]} - List of child processes and their specs
  • {:error, reason} - Failed to list processes

Examples

iex> Runtime.list_processes(worker)
{:ok, [{#PID<0.234.0>, {MyWorker, :start_link, []}}, ...]}

manage(server, command, args \\ nil)

@spec manage(GenServer.server(), command(), term()) ::
  {:ok, Jido.Agent.Runtime.State.t()} | {:error, term()}

Sends a synchronous management command to the worker.

Parameters

  • server - Runtime pid or name
  • command - Management command (see @type command)
  • args - Optional arguments for the command

Returns

  • {:ok, state} - Command processed successfully
  • {:error, reason} - Command failed

Examples

iex> Runtime.manage(worker, :pause)
{:ok, %State{status: :paused}}

iex> Runtime.manage(worker, :resume)
{:ok, %State{status: :running}}

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a worker process linked to the current process.

Options

  • :agent - Agent struct or module implementing agent behavior (required)
  • :pubsub - PubSub module for event broadcasting (required)
  • :topic - Custom topic for events (optional, defaults to agent.id)
  • :name - Registration name (optional, defaults to agent.id)
  • :max_queue_size - Maximum queue size for commands (optional, defaults to 10000)

Returns

  • {:ok, pid} - Successfully started worker
  • {:error, reason} - Failed to start worker

Examples

iex> Runtime.start_link(agent: MyAgent.new(), pubsub: MyApp.PubSub)
{:ok, #PID<0.123.0>}

iex> Runtime.start_link(
...>   agent: MyAgent.new(),
...>   pubsub: MyApp.PubSub,
...>   topic: "custom.topic",
...>   name: "worker_1"
...> )
{:ok, #PID<0.124.0>}

start_process(server, child_spec)

@spec start_process(GenServer.server(), DynamicSupervisor.child_spec()) ::
  DynamicSupervisor.on_start_child()

Starts a new child process under the runtime's DynamicSupervisor.

Parameters

  • server - Runtime pid or name
  • child_spec - Child specification for the process to start

Returns

  • {:ok, pid} - Successfully started child process
  • {:error, reason} - Failed to start child process

Examples

iex> Runtime.start_process(worker, {MyWorker, arg: :value})
{:ok, #PID<0.234.0>}

terminate_process(server, child_pid)

@spec terminate_process(GenServer.server(), pid()) :: :ok | {:error, term()}

Terminates a child process running under this runtime's supervisor.

Parameters

  • server - Runtime pid or name
  • child_pid - PID of the child process to terminate

Returns

  • :ok - Successfully terminated process
  • {:error, reason} - Failed to terminate process

Examples

iex> Runtime.terminate_process(worker, child_pid)
:ok