Orchestrates session lifecycle, run execution, and event handling.
The SessionManager is the central coordinator for managing AI agent sessions. It handles:
- Session creation, activation, and completion
- Run creation and execution via provider adapters
- Event emission and persistence
- Capability requirement enforcement
Architecture
SessionManager sits between the application and the ports/adapters layer:
Application
|
SessionManager <-- Orchestration layer
|
+---+---+
| |Store Adapter <-- Ports (interfaces)
| |Impl Impl <-- Adapters (implementations)
Usage
# Start a session
{:ok, store} = InMemorySessionStore.start_link()
{:ok, adapter} = AnthropicAdapter.start_link(api_key: "...")
{:ok, session} = SessionManager.start_session(store, adapter, %{
agent_id: "my-agent",
context: %{system_prompt: "You are helpful"}
})
# Activate and run
{:ok, _} = SessionManager.activate_session(store, session.id)
{:ok, run} = SessionManager.start_run(store, adapter, session.id, %{prompt: "Hello"})
{:ok, result} = SessionManager.execute_run(store, adapter, run.id)
# Complete session
{:ok, _} = SessionManager.complete_session(store, session.id)Event Flow
The SessionManager emits normalized events through the session store:
- Session lifecycle:
:session_created,:session_started,:session_completed, etc. - Run lifecycle:
:run_started,:run_completed,:run_failed, etc. - Provider events: Adapter events are normalized and stored
- Policy/approval events:
:policy_violation,:tool_approval_requested, etc.
Summary
Functions
Activates a pending session.
Cancels a run for approval and emits a :tool_approval_requested event.
Cancels an in-progress run.
Completes a session successfully.
Executes a run via the provider adapter.
Marks a session as failed.
Retrieves a session by ID.
Gets all events for a session.
Gets all runs for a session.
Runs a one-shot session: creates a session, activates it, starts a run, executes it, and completes (or fails) the session — all in a single call.
Creates a new run for a session.
Creates a new session with pending status.
Streams session events by polling cursor-based reads from the store.
Types
@type adapter() :: AgentSessionManager.Ports.ProviderAdapter.adapter()
@type run_once_store() :: store()
@type store() :: AgentSessionManager.Ports.SessionStore.store()
Functions
@spec activate_session(store(), String.t()) :: {:ok, AgentSessionManager.Core.Session.t()} | {:error, AgentSessionManager.Core.Error.t()}
Activates a pending session.
Transitions the session from :pending to :active status.
Returns
{:ok, Session.t()}- The activated session{:error, Error.t()}- If session not found or update fails
@spec cancel_for_approval(store(), adapter(), String.t(), map()) :: {:ok, String.t()} | {:error, AgentSessionManager.Core.Error.t()}
Cancels a run for approval and emits a :tool_approval_requested event.
This is a convenience function that combines cancellation with approval event emission in a single call. The external orchestrator can call this instead of manually cancelling and emitting events.
Parameters
store- The session storeadapter- The provider adapterrun_id- The run to cancelapproval_data- Map with tool details::tool_name,:tool_call_id,:tool_input,:policy_name,:violation_kind
Returns
{:ok, run_id}- Run was cancelled and approval event emitted{:error, Error.t()}- If cancellation fails
@spec cancel_run(store(), adapter(), String.t()) :: {:ok, String.t()} | {:error, AgentSessionManager.Core.Error.t()}
Cancels an in-progress run.
Returns
{:ok, run_id}- Run was cancelled{:error, Error.t()}- Cancellation failed
Cancellation is best-effort. If the adapter process has already exited
(:noproc/shutdown race), we still persist the run as :cancelled to
honour caller intent and avoid leaving the run in a non-terminal state.
@spec complete_session(store(), String.t()) :: {:ok, AgentSessionManager.Core.Session.t()} | {:error, AgentSessionManager.Core.Error.t()}
Completes a session successfully.
Transitions the session to :completed status.
Returns
{:ok, Session.t()}- The completed session{:error, Error.t()}- If session not found or update fails
@spec execute_run(store(), adapter(), String.t(), keyword()) :: {:ok, map()} | {:error, AgentSessionManager.Core.Error.t()}
Executes a run via the provider adapter.
This function:
- Updates the run status to
:running - Calls the adapter's
execute/4function - Handles events emitted by the adapter
- Updates the run with results or error
Options
:event_callback- A function(event_data -> any())that receives each adapter event in real time, in addition to the internal persistence callback.:continuation- Whentrue, reconstruct and inject a transcript intosession.context[:transcript]before adapter execution.:continuation_opts- Options forwarded toTranscriptBuilder.from_store/3(for example::limit,:after,:since,:max_messages).:adapter_opts- Additional adapter-specific options passed through toProviderAdapter.execute/4.:policy- Policy definition (%AgentSessionManager.Policy.Policy{}or attributes) for runtime budget/tool enforcement (single policy shorthand).:policies- A list of policies to stack-merge into one effective policy. When both:policyand:policiesare given,:policiestakes precedence.:workspace- Workspace snapshot/diff options:enabled,path,strategy,capture_patch,max_patch_bytes, androllback_on_failure(git backend only in MVP).
Returns
{:ok, result}- Execution completed successfully{:error, Error.t()}- Execution failed
@spec fail_session(store(), String.t(), AgentSessionManager.Core.Error.t()) :: {:ok, AgentSessionManager.Core.Session.t()} | {:error, AgentSessionManager.Core.Error.t()}
Marks a session as failed.
Transitions the session to :failed status and records the error.
Parameters
store- The session storesession_id- The session IDerror- The error that caused the failure
Returns
{:ok, Session.t()}- The failed session{:error, Error.t()}- If session not found or update fails
@spec get_session(store(), String.t()) :: {:ok, AgentSessionManager.Core.Session.t()} | {:error, AgentSessionManager.Core.Error.t()}
Retrieves a session by ID.
Returns
{:ok, Session.t()}- The session{:error, Error.t()}- If not found
@spec get_session_events(store(), String.t(), keyword()) :: {:ok, [AgentSessionManager.Core.Event.t()]}
Gets all events for a session.
Options
:run_id- Filter by run ID:type- Filter by event type:since- Events after this timestamp:after- Events with sequence number greater than this cursor:before- Events with sequence number less than this cursor:limit- Maximum number of events to return
@spec get_session_runs(store(), String.t()) :: {:ok, [AgentSessionManager.Core.Run.t()]}
Gets all runs for a session.
@spec run_once(run_once_store(), adapter(), map(), keyword()) :: {:ok, map()} | {:error, AgentSessionManager.Core.Error.t()}
Runs a one-shot session: creates a session, activates it, starts a run, executes it, and completes (or fails) the session — all in a single call.
This is a convenience function that collapses the full session lifecycle into one function call, ideal for simple request/response workflows.
Parameters
store- The session store instanceadapter- The provider adapter instanceinput- Input data for the run (e.g.%{messages: [...]})opts- Options::agent_id- Agent identifier (defaults to provider name):metadata- Session metadata map:context- Session context (system prompts, etc.):tags- Session tags
:event_callback-(event_data -> any())for real-time events:continuation- Enable transcript reconstruction and continuity replay:continuation_opts- Transcript builder options:adapter_opts- Adapter-specific passthrough options:policy- Policy definition for runtime budget/tool enforcement:workspace- Workspace snapshot/diff options:required_capabilities- Required capability types:optional_capabilities- Optional capability types
Returns
{:ok, result}- A map with:output,:token_usage,:events,:session_id, and:run_id{:error, Error.t()}- If any step fails
Examples
{:ok, result} = SessionManager.run_once(store, adapter, %{
messages: [%{role: "user", content: "Hello!"}]
}, event_callback: fn e -> IO.inspect(e.type) end)
IO.puts(result.output.content)
@spec start_run(store(), adapter(), String.t(), map(), keyword()) :: {:ok, AgentSessionManager.Core.Run.t()} | {:error, AgentSessionManager.Core.Error.t()}
Creates a new run for a session.
The run is created with :pending status. Use execute_run/4 to execute it.
Parameters
store- The session storeadapter- The provider adaptersession_id- The parent session IDinput- Input data for the runopts- Optional settings::required_capabilities- List of capability types that must be present:optional_capabilities- List of capability types that are nice to have
Returns
{:ok, Run.t()}- The created run{:error, Error.t()}- If session not found or capability check fails
@spec start_session(store(), adapter(), map()) :: {:ok, AgentSessionManager.Core.Session.t()} | {:error, AgentSessionManager.Core.Error.t()}
Creates a new session with pending status.
Parameters
store- The session store instanceadapter- The provider adapter instanceattrs- Session attributes::agent_id(required) - The agent identifier:metadata- Optional metadata map:context- Optional context map (system prompts, etc.):tags- Optional list of tags
Returns
{:ok, Session.t()}- The created session{:error, Error.t()}- If validation fails
Examples
{:ok, session} = SessionManager.start_session(store, adapter, %{
agent_id: "my-agent",
metadata: %{user_id: "user-123"}
})
@spec stream_session_events(store(), String.t(), keyword()) :: Enumerable.t()
Streams session events by polling cursor-based reads from the store.
The returned stream is open-ended and will keep polling for new events.
Options
:after- Starting cursor (default:0):limit- Page size per poll (default:100):poll_interval_ms- Poll interval when no new events (default:250). Ignored when:wait_timeout_msis set.:wait_timeout_ms- When set to a positive integer, the stream uses store-backed long-poll semantics instead of sleep-based polling. The store'sget_events/3is called with this option so stores that support it can block until matching events arrive or the timeout elapses, avoiding busy polling.:run_id- Optional run filter:type- Optional event type filter