AgentSessionManager.Runtime.SessionServer (AgentSessionManager v0.8.0)

Copy Markdown View Source

Per-session runtime server (GenServer) providing:

  • FIFO run queueing with configurable concurrency slots
  • submit/await/cancel run semantics
  • event subscriptions backed by the durable event store
  • optional ConcurrencyLimiter integration
  • optional ControlOperations integration
  • operational APIs: status, drain

Multi-Slot Concurrency (Phase 2)

The server supports max_concurrent_runs greater than 1, allowing multiple runs to execute in parallel within a single session. Bounded queue behaviour is preserved and runs never exceed the configured slot count.

Durable Subscriptions

Subscribers receive {:session_event, session_id, event} messages. On subscribe, the server backfills events from the store starting at :from_sequence, then delivers live events as they are appended.

The server delegates run lifecycle work to SessionManager APIs.

Summary

Types

submit_opts()

@type submit_opts() :: keyword()

subscribe_opts()

@type subscribe_opts() :: [
  from_sequence: non_neg_integer(),
  run_id: String.t(),
  type: AgentSessionManager.Core.Event.event_type()
]

Functions

await_run(server, run_id, timeout \\ Config.get(:await_run_timeout_ms))

@spec await_run(GenServer.server(), String.t(), timeout()) ::
  {:ok, map()} | {:error, AgentSessionManager.Core.Error.t()}

cancel_run(server, run_id)

@spec cancel_run(GenServer.server(), String.t()) ::
  :ok | {:error, AgentSessionManager.Core.Error.t()}

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

drain(server, timeout \\ Config.get(:await_run_timeout_ms))

@spec drain(GenServer.server(), timeout()) :: :ok | {:error, :timeout}

Waits for the queue and all in-flight runs to complete.

Returns :ok when drained, or {:error, :timeout} if the timeout elapses before all work finishes.

execute_run(server, input, opts \\ [])

@spec execute_run(GenServer.server(), map(), keyword()) ::
  {:ok, map()} | {:error, AgentSessionManager.Core.Error.t()}

interrupt_run(server, run_id)

@spec interrupt_run(GenServer.server(), String.t()) ::
  {:ok, String.t()} | :ok | {:error, AgentSessionManager.Core.Error.t()}

Interrupts an in-flight run via ControlOperations (if configured) or falls back to cancel via SessionManager.

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

status(server)

@spec status(GenServer.server()) :: map()

submit_run(server, input, opts \\ [])

@spec submit_run(GenServer.server(), map(), submit_opts()) ::
  {:ok, String.t()} | {:error, AgentSessionManager.Core.Error.t()}

subscribe(server, opts \\ [])

@spec subscribe(GenServer.server(), subscribe_opts()) ::
  {:ok, reference()} | {:error, AgentSessionManager.Core.Error.t()}

unsubscribe(server, ref)

@spec unsubscribe(GenServer.server(), reference()) :: :ok