Jido.Agent.Server.State (Jido v1.1.0-rc.2)

View Source

Defines the state management structure and transition logic for Agent Servers.

The Server.State module implements a finite state machine (FSM) that governs the lifecycle of agent workers in the Jido system. It ensures type safety and enforces valid state transitions while providing telemetry and logging for observability.

State Machine

The worker can be in one of the following states:

  • :initializing - Initial state when worker is starting up
  • :idle - Server is inactive and ready to accept new commands
  • :planning - Server is planning but not yet executing actions
  • :running - Server is actively executing commands
  • :paused - Server execution is temporarily suspended

State Transitions

Valid state transitions are:

initializing -> idle        (initialization_complete)
idle         -> planning    (plan_initiated)
idle         -> running     (direct_execution)
planning     -> running     (plan_completed)
planning     -> idle        (plan_cancelled)
running      -> paused      (execution_paused)
running      -> idle        (execution_completed)
paused       -> running     (execution_resumed)
paused       -> idle        (execution_cancelled)

Fields

  • :agent - The Agent struct being managed by this worker (required)
  • :pubsub - PubSub module for event broadcasting (required)
  • :topic - PubSub topic for worker events (required)
  • :subscriptions - List of subscribed topics (default: [])
  • :status - Current state of the worker (default: :idle)
  • :pending_signals - Queue of pending signals awaiting execution
  • :max_queue_size - Maximum number of commands that can be queued (default: 10000)
  • :child_supervisor - Dynamic supervisor PID for managing child processes

Example

iex> state = %Server.State{
...>   agent: my_agent,
...>   pubsub: MyApp.PubSub,
...>   topic: "agent.worker.1",
...>   status: :idle
...> }
iex> {:ok, new_state} = Server.State.transition(state, :running)
iex> new_state.status
:running

Summary

Types

Represents the possible states of a worker.

t()

Functions

Checks the current size of the pending signals queue.

Empties the pending queue in the server state.

Dequeues a signal from the state's pending queue.

Enqueues a signal into the state's pending signals queue.

Enqueues a signal at the front of the state's pending signals queue.

Retrieves a stored reply reference for a signal.

Removes a stored reply reference for a signal.

Stores a reply reference for a signal.

Attempts to transition the worker to a new state.

Types

dispatch_config()

log_levels()

@type log_levels() ::
  :debug | :info | :notice | :warning | :error | :critical | :alert | :emergency

modes()

@type modes() :: :auto | :step

status()

@type status() :: :initializing | :idle | :planning | :running | :paused

Represents the possible states of a worker.

  • :initializing - Server is starting up
  • :idle - Server is inactive
  • :planning - Server is planning actions
  • :running - Server is executing actions
  • :paused - Server execution is suspended

t()

@type t() :: %Jido.Agent.Server.State{
  agent: Jido.Agent.t(),
  child_supervisor: pid() | nil,
  current_signal: Jido.Signal.t(),
  current_signal_type: atom(),
  dispatch: dispatch_config(),
  journal: Jido.Signal.Journal.t(),
  log_level: log_levels(),
  max_queue_size: non_neg_integer(),
  mode: modes(),
  opts: keyword(),
  orchestrator_pid: pid() | nil,
  parent_pid: pid() | nil,
  pending_signals: :queue.queue(),
  registry: atom(),
  reply_refs: %{required(String.t()) => GenServer.from()},
  router: Jido.Signal.Router.Router.t(),
  skills: [Jido.Skill.t()],
  status: status()
}

Functions

check_queue_size(state)

@spec check_queue_size(%Jido.Agent.Server.State{
  agent: term(),
  child_supervisor: term(),
  current_signal: term(),
  current_signal_type: term(),
  dispatch: term(),
  journal: term(),
  log_level: term(),
  max_queue_size: term(),
  mode: term(),
  opts: term(),
  orchestrator_pid: term(),
  parent_pid: term(),
  pending_signals: term(),
  registry: term(),
  reply_refs: term(),
  router: term(),
  skills: term(),
  status: term()
}) :: {:ok, non_neg_integer()} | {:error, :queue_overflow}

Checks the current size of the pending signals queue.

Returns the queue size as an integer if within limits, or :queue_overflow error if exceeded.

Parameters

  • state - Current server state

Returns

  • {:ok, size} - Current queue size as integer
  • {:error, :queue_overflow} - Queue size exceeds maximum

Examples

iex> state = %Server.State{pending_signals: queue_with_items, max_queue_size: 100}
iex> Server.State.check_queue_size(state)
{:ok, 5}

iex> state = %Server.State{pending_signals: large_queue, max_queue_size: 10}
iex> Server.State.check_queue_size(state)
{:error, :queue_overflow}

clear_queue(state)

@spec clear_queue(%Jido.Agent.Server.State{
  agent: term(),
  child_supervisor: term(),
  current_signal: term(),
  current_signal_type: term(),
  dispatch: term(),
  journal: term(),
  log_level: term(),
  max_queue_size: term(),
  mode: term(),
  opts: term(),
  orchestrator_pid: term(),
  parent_pid: term(),
  pending_signals: term(),
  registry: term(),
  reply_refs: term(),
  router: term(),
  skills: term(),
  status: term()
}) ::
  {:ok,
   %Jido.Agent.Server.State{
     agent: term(),
     child_supervisor: term(),
     current_signal: term(),
     current_signal_type: term(),
     dispatch: term(),
     journal: term(),
     log_level: term(),
     max_queue_size: term(),
     mode: term(),
     opts: term(),
     orchestrator_pid: term(),
     parent_pid: term(),
     pending_signals: term(),
     registry: term(),
     reply_refs: term(),
     router: term(),
     skills: term(),
     status: term()
   }}

Empties the pending queue in the server state.

Returns a new state with an empty queue.

Parameters

  • state - Current server state

Returns

  • {:ok, new_state} - Queue was successfully emptied

Examples

iex> state = %Server.State{pending_signals: queue_with_items}
iex> Server.State.clear_queue(state)
{:ok, %Server.State{pending_signals: :queue.new()}}

dbug(_, _ \\ [])

(macro)

dequeue(state)

@spec dequeue(%Jido.Agent.Server.State{
  agent: term(),
  child_supervisor: term(),
  current_signal: term(),
  current_signal_type: term(),
  dispatch: term(),
  journal: term(),
  log_level: term(),
  max_queue_size: term(),
  mode: term(),
  opts: term(),
  orchestrator_pid: term(),
  parent_pid: term(),
  pending_signals: term(),
  registry: term(),
  reply_refs: term(),
  router: term(),
  skills: term(),
  status: term()
}) ::
  {:ok, term(),
   %Jido.Agent.Server.State{
     agent: term(),
     child_supervisor: term(),
     current_signal: term(),
     current_signal_type: term(),
     dispatch: term(),
     journal: term(),
     log_level: term(),
     max_queue_size: term(),
     mode: term(),
     opts: term(),
     orchestrator_pid: term(),
     parent_pid: term(),
     pending_signals: term(),
     registry: term(),
     reply_refs: term(),
     router: term(),
     skills: term(),
     status: term()
   }}
  | {:error, :empty_queue}

Dequeues a signal from the state's pending queue.

Returns the next signal and updated state with the signal removed from the queue. Returns error if queue is empty.

Parameters

  • state - Current server state

Returns

  • {:ok, signal, new_state} - Signal was successfully dequeued
  • {:error, :empty_queue} - Queue is empty

Examples

iex> state = %Server.State{pending_signals: queue_with_items}
iex> Server.State.dequeue(state)
{:ok, %Signal{type: "test"}, %Server.State{pending_signals: updated_queue}}

iex> state = %Server.State{pending_signals: :queue.new()}
iex> Server.State.dequeue(state)
{:error, :empty_queue}

enqueue(state, signal)

@spec enqueue(
  %Jido.Agent.Server.State{
    agent: term(),
    child_supervisor: term(),
    current_signal: term(),
    current_signal_type: term(),
    dispatch: term(),
    journal: term(),
    log_level: term(),
    max_queue_size: term(),
    mode: term(),
    opts: term(),
    orchestrator_pid: term(),
    parent_pid: term(),
    pending_signals: term(),
    registry: term(),
    reply_refs: term(),
    router: term(),
    skills: term(),
    status: term()
  },
  Jido.Signal.t()
) ::
  {:ok,
   %Jido.Agent.Server.State{
     agent: term(),
     child_supervisor: term(),
     current_signal: term(),
     current_signal_type: term(),
     dispatch: term(),
     journal: term(),
     log_level: term(),
     max_queue_size: term(),
     mode: term(),
     opts: term(),
     orchestrator_pid: term(),
     parent_pid: term(),
     pending_signals: term(),
     registry: term(),
     reply_refs: term(),
     router: term(),
     skills: term(),
     status: term()
   }}
  | {:error, :queue_overflow}

Enqueues a signal into the state's pending signals queue.

Validates that the queue size is within the configured maximum before adding. Emits a queue_overflow event if the queue is full.

Parameters

  • state - Current server state
  • signal - Signal to enqueue

Returns

  • {:ok, new_state} - Signal was successfully enqueued
  • {:error, :queue_overflow} - Queue is at max capacity

Examples

iex> state = %Server.State{pending_signals: :queue.new(), max_queue_size: 2}
iex> Server.State.enqueue(state, %Signal{type: "test"})
{:ok, %Server.State{pending_signals: updated_queue}}

iex> state = %Server.State{pending_signals: full_queue, max_queue_size: 1}
iex> Server.State.enqueue(state, %Signal{type: "test"})
{:error, :queue_overflow}

enqueue_front(state, signal)

@spec enqueue_front(
  %Jido.Agent.Server.State{
    agent: term(),
    child_supervisor: term(),
    current_signal: term(),
    current_signal_type: term(),
    dispatch: term(),
    journal: term(),
    log_level: term(),
    max_queue_size: term(),
    mode: term(),
    opts: term(),
    orchestrator_pid: term(),
    parent_pid: term(),
    pending_signals: term(),
    registry: term(),
    reply_refs: term(),
    router: term(),
    skills: term(),
    status: term()
  },
  Jido.Signal.t()
) ::
  {:ok,
   %Jido.Agent.Server.State{
     agent: term(),
     child_supervisor: term(),
     current_signal: term(),
     current_signal_type: term(),
     dispatch: term(),
     journal: term(),
     log_level: term(),
     max_queue_size: term(),
     mode: term(),
     opts: term(),
     orchestrator_pid: term(),
     parent_pid: term(),
     pending_signals: term(),
     registry: term(),
     reply_refs: term(),
     router: term(),
     skills: term(),
     status: term()
   }}
  | {:error, :queue_overflow}

Enqueues a signal at the front of the state's pending signals queue.

Validates that the queue size is within the configured maximum before adding. Emits a queue_overflow event if the queue is full.

Parameters

  • state - Current server state
  • signal - Signal to enqueue at front

Returns

  • {:ok, new_state} - Signal was successfully enqueued at front
  • {:error, :queue_overflow} - Queue is at max capacity

Examples

iex> state = %Server.State{pending_signals: :queue.new(), max_queue_size: 2}
iex> Server.State.enqueue_front(state, %Signal{type: "test"})
{:ok, %Server.State{pending_signals: updated_queue}}

iex> state = %Server.State{pending_signals: full_queue, max_queue_size: 1}
iex> Server.State.enqueue_front(state, %Signal{type: "test"})
{:error, :queue_overflow}

error(_, _ \\ [])

(macro)

get_reply_ref(state, signal_id)

@spec get_reply_ref(
  %Jido.Agent.Server.State{
    agent: term(),
    child_supervisor: term(),
    current_signal: term(),
    current_signal_type: term(),
    dispatch: term(),
    journal: term(),
    log_level: term(),
    max_queue_size: term(),
    mode: term(),
    opts: term(),
    orchestrator_pid: term(),
    parent_pid: term(),
    pending_signals: term(),
    registry: term(),
    reply_refs: term(),
    router: term(),
    skills: term(),
    status: term()
  },
  String.t()
) :: GenServer.from() | nil

Retrieves a stored reply reference for a signal.

Parameters

  • state - Current server state
  • signal_id - ID of the signal to get ref for

Returns

The stored GenServer.from() tuple or nil if not found

remove_reply_ref(state, signal_id)

@spec remove_reply_ref(
  %Jido.Agent.Server.State{
    agent: term(),
    child_supervisor: term(),
    current_signal: term(),
    current_signal_type: term(),
    dispatch: term(),
    journal: term(),
    log_level: term(),
    max_queue_size: term(),
    mode: term(),
    opts: term(),
    orchestrator_pid: term(),
    parent_pid: term(),
    pending_signals: term(),
    registry: term(),
    reply_refs: term(),
    router: term(),
    skills: term(),
    status: term()
  },
  String.t()
) :: %Jido.Agent.Server.State{
  agent: term(),
  child_supervisor: term(),
  current_signal: term(),
  current_signal_type: term(),
  dispatch: term(),
  journal: term(),
  log_level: term(),
  max_queue_size: term(),
  mode: term(),
  opts: term(),
  orchestrator_pid: term(),
  parent_pid: term(),
  pending_signals: term(),
  registry: term(),
  reply_refs: term(),
  router: term(),
  skills: term(),
  status: term()
}

Removes a stored reply reference for a signal.

Parameters

  • state - Current server state
  • signal_id - ID of the signal to remove ref for

Returns

Updated state with reply ref removed

store_reply_ref(state, signal_id, from)

@spec store_reply_ref(
  %Jido.Agent.Server.State{
    agent: term(),
    child_supervisor: term(),
    current_signal: term(),
    current_signal_type: term(),
    dispatch: term(),
    journal: term(),
    log_level: term(),
    max_queue_size: term(),
    mode: term(),
    opts: term(),
    orchestrator_pid: term(),
    parent_pid: term(),
    pending_signals: term(),
    registry: term(),
    reply_refs: term(),
    router: term(),
    skills: term(),
    status: term()
  },
  String.t(),
  GenServer.from()
) :: %Jido.Agent.Server.State{
  agent: term(),
  child_supervisor: term(),
  current_signal: term(),
  current_signal_type: term(),
  dispatch: term(),
  journal: term(),
  log_level: term(),
  max_queue_size: term(),
  mode: term(),
  opts: term(),
  orchestrator_pid: term(),
  parent_pid: term(),
  pending_signals: term(),
  registry: term(),
  reply_refs: term(),
  router: term(),
  skills: term(),
  status: term()
}

Stores a reply reference for a signal.

Parameters

  • state - Current server state
  • signal_id - ID of the signal to store ref for
  • from - GenServer.from() tuple to store

Returns

Updated state with stored reply ref

transition(state, current)

@spec transition(
  %Jido.Agent.Server.State{
    agent: term(),
    child_supervisor: term(),
    current_signal: term(),
    current_signal_type: term(),
    dispatch: term(),
    journal: term(),
    log_level: term(),
    max_queue_size: term(),
    mode: term(),
    opts: term(),
    orchestrator_pid: term(),
    parent_pid: term(),
    pending_signals: term(),
    registry: term(),
    reply_refs: term(),
    router: term(),
    skills: term(),
    status: status()
  },
  status()
) ::
  {:ok,
   %Jido.Agent.Server.State{
     agent: term(),
     child_supervisor: term(),
     current_signal: term(),
     current_signal_type: term(),
     dispatch: term(),
     journal: term(),
     log_level: term(),
     max_queue_size: term(),
     mode: term(),
     opts: term(),
     orchestrator_pid: term(),
     parent_pid: term(),
     pending_signals: term(),
     registry: term(),
     reply_refs: term(),
     router: term(),
     skills: term(),
     status: term()
   }}
  | {:error, {:invalid_transition, status(), status()}}

Attempts to transition the worker to a new state.

This function enforces the state machine rules defined in @transitions. It logs state transitions for debugging and monitoring purposes.

Parameters

  • state - Current Server.State struct
  • desired - Desired target state

Returns

  • {:ok, new_state} - Transition was successful
  • {:error, {:invalid_transition, current, desired}} - Invalid state transition

Examples

iex> state = %Server.State{status: :idle}
iex> Server.State.transition(state, :running)
{:ok, %Server.State{status: :running}}

iex> state = %Server.State{status: :idle}
iex> Server.State.transition(state, :paused)
{:error, {:invalid_transition, :idle, :paused}}