View Source Jido.Agent.Runtime (Jido v1.0.0-rc.4)
A GenServer implementation for managing agent state and operations in a distributed system.
The Runtime module provides a robust framework for managing agent lifecycle, state transitions, and command processing. It handles both synchronous and asynchronous operations while maintaining fault tolerance and providing comprehensive logging and telemetry.
Features
- State machine-based lifecycle management
- Asynchronous command processing with queuing
- PubSub-based event broadcasting
- Comprehensive error handling and recovery
- Telemetry integration for monitoring
- Distributed registration via Registry
States
Runtimes follow a state machine pattern with these states:
:initializing
- Initial startup state:idle
- Ready to accept commands:running
- Actively processing commands:paused
- Suspended command processing (queues new commands):planning
- Planning but not executing actions
Command Types
The worker processes two main types of commands:
- Act commands: Asynchronous actions that modify agent state
- Management commands: Synchronous operations that control worker behavior
Usage
Start a worker under a supervisor:
children = [
{Jido.Agent.Runtime,
agent: MyAgent.new(),
pubsub: MyApp.PubSub,
topic: "custom.topic",
max_queue_size: 1000 # Optional, defaults to 10000
}
]
Supervisor.start_link(children, strategy: :one_for_one)
Send commands to the worker:
# Asynchronous action
Runtime.act(worker_pid, %{command: :move, destination: :kitchen})
# Synchronous management
{:ok, new_state} = Runtime.manage(worker_pid, :pause)
Events
The worker emits these events on its PubSub topic:
jido.agent.started
- Runtime initialization completejido.agent.state_changed
- Runtime state transitionsjido.agent.cmd_completed
- Action execution completedjido.agent.queue_overflow
- Queue size exceeded max_queue_size
Error Handling
The worker implements several error handling mechanisms:
- Command validation and queueing
- State transition validation
- Automatic command retries (configurable)
- Dead letter handling for failed commands
- Queue size limits with overflow protection
Summary
Functions
Returns a child specification for starting the worker under a supervisor.
Sends a synchronous action command to the worker.
Sends an asynchronous command to the worker.
Gets the topic for an agent runtime process.
Lists all child processes running under this runtime's supervisor.
Sends a synchronous management command to the worker.
Starts a worker process linked to the current process.
Starts a new child process under the runtime's DynamicSupervisor.
Terminates a child process running under this runtime's supervisor.
Types
@type command() :: :replan | :pause | :resume | :reset | :stop
Management commands that can be sent to the worker.
:replan
- Trigger replanning of current actions:pause
- Suspend command processing:resume
- Resume command processing:reset
- Reset to initial state:stop
- Gracefully stop the worker
Functions
@spec child_spec(keyword()) :: Supervisor.child_spec()
Returns a child specification for starting the worker under a supervisor.
Options
Accepts same options as start_link/1
plus:
:id
- Optional supervisor child id (defaults to module name)
Examples
children = [
{Runtime, agent: agent, pubsub: pubsub, id: :worker_1}
]
Supervisor.start_link(children, strategy: :one_for_one)
@spec cmd(GenServer.server(), atom(), map(), keyword()) :: {:ok, Jido.Agent.Runtime.State.t()} | {:error, term()}
Sends a synchronous action command to the worker.
This function blocks until the action is completed and returns the result.
Parameters
server
- Runtime pid or namecommand
- Action command as atomargs
- Map of additional arguments for the commandopts
- Optional keyword list of options::apply_state
- Whether to apply results to agent state (default: true)
Returns
{:ok, new_state}
- Action completed successfully with new state{:error, reason}
- Action failed
Examples
iex> Runtime.cmd(worker, :move, %{destination: :kitchen})
{:ok, %State{}}
iex> Runtime.cmd(worker, :recharge, %{}, apply_state: false)
{:ok, %State{}}
@spec cmd_async(GenServer.server(), atom(), map(), keyword()) :: :ok | {:error, term()}
Sends an asynchronous command to the worker.
The command is processed based on the worker's current state:
- If :running or :idle - Executed immediately
- If :paused - Queued for later execution
- Otherwise - Returns error
Parameters
server
- Runtime pid or namecommand
- Action command as atomargs
- Map of additional arguments for the commandopts
- Optional keyword list of options::apply_state
- Whether to apply results to agent state (default: true)
Returns
:ok
- Command accepted for processing{:error, reason}
- Command rejected
Examples
iex> Runtime.cmd_async(worker, :move, %{destination: :kitchen})
:ok
iex> Runtime.cmd_async(worker, :recharge, %{}, apply_state: false)
:ok
@spec get_topic(GenServer.server()) :: {:ok, String.t()} | {:error, term()}
Gets the topic for an agent runtime process.
Parameters
server
- Runtime pid or name
Returns
{:ok, topic}
- The topic string for the agent{:error, reason}
- Failed to get topic
Examples
iex> Runtime.get_topic(worker)
{:ok, "jido.agent.my_agent_1"}
@spec list_processes(GenServer.server()) :: {:ok, [{pid(), DynamicSupervisor.child_spec()}]} | {:error, term()}
Lists all child processes running under this runtime's supervisor.
Parameters
server
- Runtime pid or name
Returns
{:ok, [{pid(), child_spec()}]}
- List of child processes and their specs{:error, reason}
- Failed to list processes
Examples
iex> Runtime.list_processes(worker)
{:ok, [{#PID<0.234.0>, {MyWorker, :start_link, []}}, ...]}
@spec manage(GenServer.server(), command(), term()) :: {:ok, Jido.Agent.Runtime.State.t()} | {:error, term()}
Sends a synchronous management command to the worker.
Parameters
server
- Runtime pid or namecommand
- Management command (see @type command)args
- Optional arguments for the command
Returns
{:ok, state}
- Command processed successfully{:error, reason}
- Command failed
Examples
iex> Runtime.manage(worker, :pause)
{:ok, %State{status: :paused}}
iex> Runtime.manage(worker, :resume)
{:ok, %State{status: :running}}
@spec start_link(keyword()) :: GenServer.on_start()
Starts a worker process linked to the current process.
Options
:agent
- Agent struct or module implementing agent behavior (required):pubsub
- PubSub module for event broadcasting (required):topic
- Custom topic for events (optional, defaults to agent.id):name
- Registration name (optional, defaults to agent.id):max_queue_size
- Maximum queue size for commands (optional, defaults to 10000)
Returns
{:ok, pid}
- Successfully started worker{:error, reason}
- Failed to start worker
Examples
iex> Runtime.start_link(agent: MyAgent.new(), pubsub: MyApp.PubSub)
{:ok, #PID<0.123.0>}
iex> Runtime.start_link(
...> agent: MyAgent.new(),
...> pubsub: MyApp.PubSub,
...> topic: "custom.topic",
...> name: "worker_1"
...> )
{:ok, #PID<0.124.0>}
@spec start_process(GenServer.server(), DynamicSupervisor.child_spec()) :: DynamicSupervisor.on_start_child()
Starts a new child process under the runtime's DynamicSupervisor.
Parameters
server
- Runtime pid or namechild_spec
- Child specification for the process to start
Returns
{:ok, pid}
- Successfully started child process{:error, reason}
- Failed to start child process
Examples
iex> Runtime.start_process(worker, {MyWorker, arg: :value})
{:ok, #PID<0.234.0>}
@spec terminate_process(GenServer.server(), pid()) :: :ok | {:error, term()}
Terminates a child process running under this runtime's supervisor.
Parameters
server
- Runtime pid or namechild_pid
- PID of the child process to terminate
Returns
:ok
- Successfully terminated process{:error, reason}
- Failed to terminate process
Examples
iex> Runtime.terminate_process(worker, child_pid)
:ok