AgentSessionManager.Concurrency.ControlOperations (AgentSessionManager v0.8.0)

Copy Markdown View Source

Manages control operations (interrupt, cancel, pause, resume) for runs.

This module provides a centralized way to manage control operations across different adapters. It handles:

  • Interrupt: Immediately stop a running operation
  • Cancel: Permanently cancel an operation (terminal state)
  • Pause: Temporarily pause an operation (requires capability)
  • Resume: Resume a paused operation (requires capability)

Idempotency

All control operations are idempotent:

  • Interrupting an already interrupted run succeeds
  • Cancelling an already cancelled run succeeds
  • Pausing an already paused run succeeds
  • Resuming an already running run succeeds

Capability Checking

Pause and resume operations require the adapter to support these capabilities. If the adapter doesn't have the required capability, an error is returned.

Terminal States

Once a run enters a terminal state (:cancelled, :completed, :failed), it cannot be resumed or have further operations performed on it.

Usage

{:ok, ops} = ControlOperations.start_link(adapter: adapter)

# Interrupt a running operation
{:ok, run_id} = ControlOperations.interrupt(ops, run_id)

# Cancel an operation (terminal)
{:ok, run_id} = ControlOperations.cancel(ops, run_id)

# Pause (if supported)
{:ok, run_id} = ControlOperations.pause(ops, run_id)

# Resume (if supported)
{:ok, run_id} = ControlOperations.resume(ops, run_id)

Summary

Functions

Cancels an operation.

Cancels multiple runs at once.

Returns a specification to start this module under a supervisor.

Gets the full operation history for a run.

Gets the operation status for a run.

Interrupts a running operation.

Interrupts all runs for a given session.

Pauses a running operation.

Registers a run as belonging to a session.

Resumes a paused operation.

Starts the control operations manager.

Checks if a state is a terminal state.

Types

operation()

@type operation() :: :interrupt | :cancel | :pause | :resume

operation_record()

@type operation_record() :: %{
  operation: operation(),
  timestamp: DateTime.t(),
  result: :ok | {:error, AgentSessionManager.Core.Error.t()}
}

operation_status()

@type operation_status() :: %{
  last_operation: operation() | nil,
  state: atom(),
  history: [operation_record()]
}

Functions

cancel(server, run_id)

@spec cancel(GenServer.server(), String.t()) ::
  {:ok, String.t()} | {:error, AgentSessionManager.Core.Error.t()}

Cancels an operation.

This is an idempotent operation - cancelling an already cancelled run will succeed. Cancel is a terminal operation - the run cannot be resumed.

Returns

  • {:ok, run_id} - Cancel succeeded
  • {:error, Error.t()} - Cancel failed

cancel_all(server, run_ids)

@spec cancel_all(GenServer.server(), [String.t()]) :: %{
  required(String.t()) => :ok | {:error, AgentSessionManager.Core.Error.t()}
}

Cancels multiple runs at once.

Returns a map of run_id => :ok | {:error, Error.t()}.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_operation_history(server, run_id)

@spec get_operation_history(GenServer.server(), String.t()) :: [operation_record()]

Gets the full operation history for a run.

get_operation_status(server, run_id)

@spec get_operation_status(GenServer.server(), String.t()) :: operation_status()

Gets the operation status for a run.

interrupt(server, run_id)

@spec interrupt(GenServer.server(), String.t()) ::
  {:ok, String.t()} | {:error, AgentSessionManager.Core.Error.t()}

Interrupts a running operation.

This is an idempotent operation - interrupting an already interrupted run will succeed.

Returns

  • {:ok, run_id} - Interrupt succeeded
  • {:error, Error.t()} - Interrupt failed

interrupt_session(server, session_id)

@spec interrupt_session(GenServer.server(), String.t()) :: %{
  required(String.t()) => :ok | {:error, AgentSessionManager.Core.Error.t()}
}

Interrupts all runs for a given session.

Returns a map of run_id => :ok | {:error, Error.t()}.

pause(server, run_id)

@spec pause(GenServer.server(), String.t()) ::
  {:ok, String.t()} | {:error, AgentSessionManager.Core.Error.t()}

Pauses a running operation.

This is an idempotent operation - pausing an already paused run will succeed. Requires the adapter to have the "pause" capability.

Returns

  • {:ok, run_id} - Pause succeeded
  • {:error, %Error{code: :capability_not_supported}} - Adapter doesn't support pause
  • {:error, Error.t()} - Pause failed

register_run(server, session_id, run_id)

@spec register_run(GenServer.server(), String.t(), String.t()) :: :ok

Registers a run as belonging to a session.

This is used for session-level operations like interrupt_session.

resume(server, run_id)

@spec resume(GenServer.server(), String.t()) ::
  {:ok, String.t()} | {:error, AgentSessionManager.Core.Error.t()}

Resumes a paused operation.

This is an idempotent operation - resuming an already running run will succeed. Requires the adapter to have the "resume" capability. Cannot resume a run in a terminal state.

Returns

  • {:ok, run_id} - Resume succeeded
  • {:error, %Error{code: :capability_not_supported}} - Adapter doesn't support resume
  • {:error, %Error{code: :invalid_operation}} - Run is in a terminal state
  • {:error, Error.t()} - Resume failed

start_link(opts \\ [])

@spec start_link(keyword()) :: GenServer.on_start()

Starts the control operations manager.

Options

  • :adapter - The provider adapter to use for operations (required)
  • :name - Optional GenServer name

terminal_state?(state)

@spec terminal_state?(atom()) :: boolean()

Checks if a state is a terminal state.

Terminal states are: :cancelled, :completed, :failed, :timeout