AgentSessionManager.SessionManager (AgentSessionManager v0.8.0)

Copy Markdown View Source

Orchestrates session lifecycle, run execution, and event handling.

The SessionManager is the central coordinator for managing AI agent sessions. It handles:

  • Session creation, activation, and completion
  • Run creation and execution via provider adapters
  • Event emission and persistence
  • Capability requirement enforcement

Architecture

SessionManager sits between the application and the ports/adapters layer:

Application
    |
SessionManager  <-- Orchestration layer
    |
+---+---+
|       |

Store Adapter <-- Ports (interfaces)

|       |

Impl Impl <-- Adapters (implementations)

Usage

# Start a session
{:ok, store} = InMemorySessionStore.start_link()
{:ok, adapter} = AnthropicAdapter.start_link(api_key: "...")

{:ok, session} = SessionManager.start_session(store, adapter, %{
  agent_id: "my-agent",
  context: %{system_prompt: "You are helpful"}
})

# Activate and run
{:ok, _} = SessionManager.activate_session(store, session.id)
{:ok, run} = SessionManager.start_run(store, adapter, session.id, %{prompt: "Hello"})
{:ok, result} = SessionManager.execute_run(store, adapter, run.id)

# Complete session
{:ok, _} = SessionManager.complete_session(store, session.id)

Event Flow

The SessionManager emits normalized events through the session store:

  1. Session lifecycle: :session_created, :session_started, :session_completed, etc.
  2. Run lifecycle: :run_started, :run_completed, :run_failed, etc.
  3. Provider events: Adapter events are normalized and stored
  4. Policy/approval events: :policy_violation, :tool_approval_requested, etc.

Summary

Functions

Activates a pending session.

Cancels a run for approval and emits a :tool_approval_requested event.

Cancels an in-progress run.

Completes a session successfully.

Executes a run via the provider adapter.

Marks a session as failed.

Retrieves a session by ID.

Gets all events for a session.

Gets all runs for a session.

Runs a one-shot session: creates a session, activates it, starts a run, executes it, and completes (or fails) the session — all in a single call.

Creates a new session with pending status.

Streams session events by polling cursor-based reads from the store.

Types

adapter()

run_once_store()

@type run_once_store() :: store()

store()

Functions

activate_session(store, session_id)

@spec activate_session(store(), String.t()) ::
  {:ok, AgentSessionManager.Core.Session.t()}
  | {:error, AgentSessionManager.Core.Error.t()}

Activates a pending session.

Transitions the session from :pending to :active status.

Returns

  • {:ok, Session.t()} - The activated session
  • {:error, Error.t()} - If session not found or update fails

cancel_for_approval(store, adapter, run_id, approval_data \\ %{})

@spec cancel_for_approval(store(), adapter(), String.t(), map()) ::
  {:ok, String.t()} | {:error, AgentSessionManager.Core.Error.t()}

Cancels a run for approval and emits a :tool_approval_requested event.

This is a convenience function that combines cancellation with approval event emission in a single call. The external orchestrator can call this instead of manually cancelling and emitting events.

Parameters

  • store - The session store
  • adapter - The provider adapter
  • run_id - The run to cancel
  • approval_data - Map with tool details: :tool_name, :tool_call_id, :tool_input, :policy_name, :violation_kind

Returns

  • {:ok, run_id} - Run was cancelled and approval event emitted
  • {:error, Error.t()} - If cancellation fails

cancel_run(store, adapter, run_id)

@spec cancel_run(store(), adapter(), String.t()) ::
  {:ok, String.t()} | {:error, AgentSessionManager.Core.Error.t()}

Cancels an in-progress run.

Returns

  • {:ok, run_id} - Run was cancelled
  • {:error, Error.t()} - Cancellation failed

Cancellation is best-effort. If the adapter process has already exited (:noproc/shutdown race), we still persist the run as :cancelled to honour caller intent and avoid leaving the run in a non-terminal state.

complete_session(store, session_id)

@spec complete_session(store(), String.t()) ::
  {:ok, AgentSessionManager.Core.Session.t()}
  | {:error, AgentSessionManager.Core.Error.t()}

Completes a session successfully.

Transitions the session to :completed status.

Returns

  • {:ok, Session.t()} - The completed session
  • {:error, Error.t()} - If session not found or update fails

execute_run(store, adapter, run_id, opts \\ [])

@spec execute_run(store(), adapter(), String.t(), keyword()) ::
  {:ok, map()} | {:error, AgentSessionManager.Core.Error.t()}

Executes a run via the provider adapter.

This function:

  1. Updates the run status to :running
  2. Calls the adapter's execute/4 function
  3. Handles events emitted by the adapter
  4. Updates the run with results or error

Options

  • :event_callback - A function (event_data -> any()) that receives each adapter event in real time, in addition to the internal persistence callback.
  • :continuation - When true, reconstruct and inject a transcript into session.context[:transcript] before adapter execution.
  • :continuation_opts - Options forwarded to TranscriptBuilder.from_store/3 (for example: :limit, :after, :since, :max_messages).
  • :adapter_opts - Additional adapter-specific options passed through to ProviderAdapter.execute/4.
  • :policy - Policy definition (%AgentSessionManager.Policy.Policy{} or attributes) for runtime budget/tool enforcement (single policy shorthand).
  • :policies - A list of policies to stack-merge into one effective policy. When both :policy and :policies are given, :policies takes precedence.
  • :workspace - Workspace snapshot/diff options: enabled, path, strategy, capture_patch, max_patch_bytes, and rollback_on_failure (git backend only in MVP).

Returns

  • {:ok, result} - Execution completed successfully
  • {:error, Error.t()} - Execution failed

fail_session(store, session_id, error)

Marks a session as failed.

Transitions the session to :failed status and records the error.

Parameters

  • store - The session store
  • session_id - The session ID
  • error - The error that caused the failure

Returns

  • {:ok, Session.t()} - The failed session
  • {:error, Error.t()} - If session not found or update fails

get_session(store, session_id)

@spec get_session(store(), String.t()) ::
  {:ok, AgentSessionManager.Core.Session.t()}
  | {:error, AgentSessionManager.Core.Error.t()}

Retrieves a session by ID.

Returns

  • {:ok, Session.t()} - The session
  • {:error, Error.t()} - If not found

get_session_events(store, session_id, opts \\ [])

@spec get_session_events(store(), String.t(), keyword()) ::
  {:ok, [AgentSessionManager.Core.Event.t()]}

Gets all events for a session.

Options

  • :run_id - Filter by run ID
  • :type - Filter by event type
  • :since - Events after this timestamp
  • :after - Events with sequence number greater than this cursor
  • :before - Events with sequence number less than this cursor
  • :limit - Maximum number of events to return

get_session_runs(store, session_id)

@spec get_session_runs(store(), String.t()) ::
  {:ok, [AgentSessionManager.Core.Run.t()]}

Gets all runs for a session.

run_once(store, adapter, input, opts \\ [])

@spec run_once(run_once_store(), adapter(), map(), keyword()) ::
  {:ok, map()} | {:error, AgentSessionManager.Core.Error.t()}

Runs a one-shot session: creates a session, activates it, starts a run, executes it, and completes (or fails) the session — all in a single call.

This is a convenience function that collapses the full session lifecycle into one function call, ideal for simple request/response workflows.

Parameters

  • store - The session store instance
  • adapter - The provider adapter instance
  • input - Input data for the run (e.g. %{messages: [...]})
  • opts - Options:
    • :agent_id - Agent identifier (defaults to provider name)
    • :metadata - Session metadata map
    • :context - Session context (system prompts, etc.)
    • :tags - Session tags
  • :event_callback - (event_data -> any()) for real-time events
  • :continuation - Enable transcript reconstruction and continuity replay
  • :continuation_opts - Transcript builder options
  • :adapter_opts - Adapter-specific passthrough options
  • :policy - Policy definition for runtime budget/tool enforcement
  • :workspace - Workspace snapshot/diff options
  • :required_capabilities - Required capability types
  • :optional_capabilities - Optional capability types

Returns

  • {:ok, result} - A map with :output, :token_usage, :events, :session_id, and :run_id
  • {:error, Error.t()} - If any step fails

Examples

{:ok, result} = SessionManager.run_once(store, adapter, %{
  messages: [%{role: "user", content: "Hello!"}]
}, event_callback: fn e -> IO.inspect(e.type) end)

IO.puts(result.output.content)

start_run(store, adapter, session_id, input, opts \\ [])

@spec start_run(store(), adapter(), String.t(), map(), keyword()) ::
  {:ok, AgentSessionManager.Core.Run.t()}
  | {:error, AgentSessionManager.Core.Error.t()}

Creates a new run for a session.

The run is created with :pending status. Use execute_run/4 to execute it.

Parameters

  • store - The session store
  • adapter - The provider adapter
  • session_id - The parent session ID
  • input - Input data for the run
  • opts - Optional settings:
    • :required_capabilities - List of capability types that must be present
    • :optional_capabilities - List of capability types that are nice to have

Returns

  • {:ok, Run.t()} - The created run
  • {:error, Error.t()} - If session not found or capability check fails

start_session(store, adapter, attrs)

@spec start_session(store(), adapter(), map()) ::
  {:ok, AgentSessionManager.Core.Session.t()}
  | {:error, AgentSessionManager.Core.Error.t()}

Creates a new session with pending status.

Parameters

  • store - The session store instance
  • adapter - The provider adapter instance
  • attrs - Session attributes:
    • :agent_id (required) - The agent identifier
    • :metadata - Optional metadata map
    • :context - Optional context map (system prompts, etc.)
    • :tags - Optional list of tags

Returns

  • {:ok, Session.t()} - The created session
  • {:error, Error.t()} - If validation fails

Examples

{:ok, session} = SessionManager.start_session(store, adapter, %{
  agent_id: "my-agent",
  metadata: %{user_id: "user-123"}
})

stream_session_events(store, session_id, opts \\ [])

@spec stream_session_events(store(), String.t(), keyword()) :: Enumerable.t()

Streams session events by polling cursor-based reads from the store.

The returned stream is open-ended and will keep polling for new events.

Options

  • :after - Starting cursor (default: 0)
  • :limit - Page size per poll (default: 100)
  • :poll_interval_ms - Poll interval when no new events (default: 250). Ignored when :wait_timeout_ms is set.
  • :wait_timeout_ms - When set to a positive integer, the stream uses store-backed long-poll semantics instead of sleep-based polling. The store's get_events/3 is called with this option so stores that support it can block until matching events arrive or the timeout elapses, avoiding busy polling.
  • :run_id - Optional run filter
  • :type - Optional event type filter