AgentSessionManager.Concurrency.ConcurrencyLimiter (AgentSessionManager v0.8.0)

Copy Markdown View Source

Enforces concurrency limits for sessions and runs.

This module provides centralized tracking and enforcement of concurrency limits across the session manager. It ensures that:

  • The maximum number of parallel sessions is not exceeded
  • The maximum number of parallel runs (across all sessions) is not exceeded
  • Resources are properly released when sessions/runs complete

Configuration

  • :max_parallel_sessions - Maximum concurrent sessions (default: 100, or :infinity)
  • :max_parallel_runs - Maximum concurrent runs globally (default: 50, or :infinity)

Design

The limiter uses a GenServer with ETS tables for efficient concurrent reads. All write operations (acquire/release) go through the GenServer to ensure consistency. Read operations (status checks) can be done directly from ETS.

Idempotency

All operations are idempotent:

  • Acquiring the same session/run multiple times only counts once
  • Releasing a non-existent session/run is a no-op

Usage

{:ok, limiter} = ConcurrencyLimiter.start_link(
  max_parallel_sessions: 10,
  max_parallel_runs: 20
)

# Acquire slots before starting sessions/runs
:ok = ConcurrencyLimiter.acquire_session_slot(limiter, session_id)
:ok = ConcurrencyLimiter.acquire_run_slot(limiter, session_id, run_id)

# Release when done
:ok = ConcurrencyLimiter.release_run_slot(limiter, run_id)
:ok = ConcurrencyLimiter.release_session_slot(limiter, session_id)

Summary

Functions

Attempts to acquire a run slot.

Attempts to acquire a session slot.

Returns a specification to start this module under a supervisor.

Gets the configured limits.

Gets the current status including active counts and available capacity.

Releases a run slot.

Starts the concurrency limiter.

Types

limits()

@type limits() :: %{
  max_parallel_sessions: pos_integer() | :infinity,
  max_parallel_runs: pos_integer() | :infinity
}

status()

@type status() :: %{
  active_sessions: non_neg_integer(),
  active_runs: non_neg_integer(),
  max_parallel_sessions: pos_integer() | :infinity,
  max_parallel_runs: pos_integer() | :infinity,
  available_session_slots: non_neg_integer() | :infinity,
  available_run_slots: non_neg_integer() | :infinity
}

Functions

acquire_run_slot(limiter, session_id, run_id)

@spec acquire_run_slot(GenServer.server(), String.t(), String.t()) ::
  :ok | {:error, AgentSessionManager.Core.Error.t()}

Attempts to acquire a run slot.

Returns :ok if successful, or an error if the limit would be exceeded.

This operation is idempotent - acquiring the same run_id multiple times only counts as one slot.

Parameters

  • limiter - The limiter server
  • session_id - The parent session identifier
  • run_id - The run identifier

Returns

  • :ok - Run slot acquired
  • {:error, %Error{code: :max_runs_exceeded}} - Limit exceeded

acquire_session_slot(limiter, session_id)

@spec acquire_session_slot(GenServer.server(), String.t()) ::
  :ok | {:error, AgentSessionManager.Core.Error.t()}

Attempts to acquire a session slot.

Returns :ok if successful, or an error if the limit would be exceeded.

This operation is idempotent - acquiring the same session_id multiple times only counts as one slot.

Parameters

  • limiter - The limiter server
  • session_id - The session identifier

Returns

  • :ok - Session slot acquired
  • {:error, %Error{code: :max_sessions_exceeded}} - Limit exceeded

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_limits(limiter)

@spec get_limits(GenServer.server()) :: limits()

Gets the configured limits.

get_status(limiter)

@spec get_status(GenServer.server()) :: status()

Gets the current status including active counts and available capacity.

release_run_slot(limiter, run_id)

@spec release_run_slot(GenServer.server(), String.t()) :: :ok

Releases a run slot.

This operation is idempotent - releasing a non-existent run is a no-op.

Parameters

  • limiter - The limiter server
  • run_id - The run identifier

Returns

  • :ok - Always succeeds

release_session_slot(limiter, session_id)

@spec release_session_slot(GenServer.server(), String.t()) :: :ok

Releases a session slot.

This operation is idempotent - releasing a non-existent session is a no-op. When a session is released, all of its associated runs are also released.

Parameters

  • limiter - The limiter server
  • session_id - The session identifier

Returns

  • :ok - Always succeeds

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

Starts the concurrency limiter.

Options

  • :max_parallel_sessions - Maximum concurrent sessions (default: 100)
  • :max_parallel_runs - Maximum concurrent runs (default: 50)
  • :name - Optional GenServer name