AgentSessionManager
Unified Elixir infrastructure for orchestrating AI agent sessions across Claude, Codex, and Amp — with lifecycle management, event streaming, persistence, and provider-agnostic observability.
Why AgentSessionManager?
When you call an AI provider API directly, you get back a response. When you build a product on top of AI agents, you need much more: session state, event history, cost tracking, cancellation, persistence, and the ability to swap providers without rewriting your application.
AgentSessionManager is the infrastructure layer that gives you all of this through a clean ports-and-adapters architecture.
| Concern | Without ASM | With ASM |
|---|---|---|
| Session state | Hand-roll per provider | Unified lifecycle with state machines |
| Event formats | Parse each provider separately | 20+ normalized event types |
| Persistence | Build your own | Pluggable: InMemory, Ecto (Postgres/SQLite), Ash, S3 |
| Cost visibility | None | Model-aware cost tracking per run |
| Provider lock-in | Rewrite when switching | Swap adapters, keep application code |
| Observability | Ad-hoc logging | Telemetry, rendering pipeline, audit trail |
See Architecture for the full design.
Quick Start
Add the dependency and at least one provider SDK:
def deps do
[
{:agent_session_manager, "~> 0.8.0"},
{:claude_agent_sdk, "~> 0.14.0"} # or {:codex_sdk, "~> 0.10.0"} / {:amp_sdk, "~> 0.4.0"}
]
endRun a one-shot session:
alias AgentSessionManager.SessionManager
alias AgentSessionManager.Adapters.{ClaudeAdapter, InMemorySessionStore}
{:ok, store} = InMemorySessionStore.start_link()
{:ok, adapter} = ClaudeAdapter.start_link()
{:ok, result} =
SessionManager.run_once(store, adapter,
%{messages: [%{role: "user", content: "What is the BEAM?"}]},
event_callback: fn
%{type: :message_streamed, data: %{delta: text}} -> IO.write(text)
_ -> :ok
end
)
IO.inspect(result.token_usage)This creates a session, executes one run, streams events, and cleans up. For provider-specific setup, see Provider Adapters.
Choose Your Abstraction Level
Start with the simplest API that meets your needs:
| API | Use when you need | Complexity |
|---|---|---|
SessionManager.run_once/4 | Single request/response, scripts, testing | Lowest |
StreamSession.start/1 | Lazy event stream from a one-shot call | Low |
SessionManager full lifecycle | Multi-run sessions, explicit state control | Medium |
SessionServer | Per-session queuing, concurrent runs, subscriptions | Highest |
StreamSession
Collapses boilerplate into a single call that returns a lazy stream:
{:ok, stream, close, _meta} =
StreamSession.start(
adapter: {ClaudeAdapter, []},
input: %{messages: [%{role: "user", content: "Hello!"}]}
)
stream |> Stream.each(&IO.inspect/1) |> Stream.run()
close.()See StreamSession.
Full Lifecycle
Explicit control over session creation, activation, runs, and completion:
{:ok, session} = SessionManager.start_session(store, adapter, %{agent_id: "my-agent"})
{:ok, session} = SessionManager.activate_session(store, session.id)
{:ok, run} = SessionManager.start_run(store, adapter, session.id, %{messages: messages})
{:ok, result} = SessionManager.execute_run(store, adapter, run.id)
{:ok, _} = SessionManager.complete_session(store, session.id)See Sessions and Runs.
SessionServer
Per-session GenServer with FIFO queuing and multi-slot concurrency:
{:ok, server} =
SessionServer.start_link(
store: store,
adapter: adapter,
session_opts: %{agent_id: "runtime-agent"},
max_concurrent_runs: 2
)
{:ok, run_id} = SessionServer.submit_run(server, %{messages: messages})
{:ok, result} = SessionServer.await_run(server, run_id, 120_000)
:ok = SessionServer.drain(server, 30_000)Core Concepts
Your Application
|
SessionManager -- orchestrates lifecycle, events, capability checks
|
+----+----+
| |
Store Adapter -- ports (interfaces / behaviours)
| |
ETS/DB Claude/Codex/Amp -- adapters (implementations)Sessions are containers for a series of agent interactions. State machine: pending -> active -> completed / failed / cancelled.
Runs represent one execution within a session -- sending input and receiving output. State machine: pending -> running -> completed / failed / cancelled / timeout. Runs track token usage and output.
Events are immutable records emitted during execution. They provide an audit trail, power streaming, and enable cursor-based replay.
Ports
The core depends only on behaviours. Swap implementations without touching application code.
| Port | Purpose |
|---|---|
ProviderAdapter | AI provider integration (execute, cancel, capabilities) |
SessionStore | Session, run, and event persistence |
ArtifactStore | Binary artifact storage (patches, files) |
QueryAPI | Cross-session queries and analytics |
Maintenance | Retention, cleanup, health checks |
Providers
| Adapter | Provider | Streaming | Tool Use | Cancel |
|---|---|---|---|---|
ClaudeAdapter | Anthropic Claude | Yes | Yes | Yes |
CodexAdapter | Codex CLI | Yes | Yes | Yes |
AmpAdapter | Amp (Sourcegraph) | Yes | Yes | Yes |
ShellAdapter | Shell commands | Yes | No | Yes |
All adapters accept :permission_mode, :max_turns, :system_prompt, and :sdk_opts for provider-specific passthrough. For multi-provider setups, ProviderRouter acts as a meta-adapter with capability-based selection, failover, and circuit breaker. See Provider Routing.
Normalized Events
Each provider emits events in its own format. Adapters normalize them into a canonical set:
| Event | Description |
|---|---|
run_started | Execution began |
message_streamed | Streaming content chunk |
message_received | Complete message ready |
tool_call_started | Tool invocation begins |
tool_call_completed | Tool finished |
token_usage_updated | Usage stats updated |
run_completed | Execution finished successfully |
run_failed | Execution failed |
run_cancelled | Execution cancelled |
policy_violation | Policy limit exceeded |
Failure events keep backward-compatible error_message and may also include
provider_error:
%{
provider: :codex | :amp | :claude | :gemini | :unknown,
kind: atom(),
message: String.t(),
exit_code: integer() | nil,
stderr: String.t() | nil,
truncated?: boolean() | nil
}provider_error.stderr is truncated before emission/persistence using
AgentSessionManager.Config keys :error_text_max_bytes and :error_text_max_lines.
When persistence redaction is enabled, nested provider_error fields are also scanned.
See Events and Streaming for the full event type reference.
Features
Persistence and Storage
Pluggable storage backends for sessions, runs, events, and artifacts. InMemorySessionStore for development and testing; EctoSessionStore for PostgreSQL or SQLite in production; AshSessionStore as an Ash Framework alternative; S3ArtifactStore for large binary artifacts; CompositeSessionStore to unify session and artifact backends.
# Production setup with Ecto + SQLite
{:ok, store} = EctoSessionStore.start_link(repo: MyApp.Repo)Guides: Persistence Overview | Ecto | SQLite | Ash | S3 | Custom
Event Streaming and Observability
Cursor-backed event streaming with monotonic per-session sequence numbers, durable pagination (after/before/limit), and optional long-poll support. The rendering pipeline separates formatting (renderers) from output (sinks) -- compose StudioRenderer, CompactRenderer, or VerboseRenderer with TTYSink, FileSink, JSONLSink, PubSubSink, or CallbackSink.
Rendering.stream(event_stream,
renderer: {StudioRenderer, verbosity: :summary},
sinks: [{TTYSink, []}, {JSONLSink, path: "events.jsonl"}]
)Guides: Events and Streaming | Cursor Streaming | Rendering | Telemetry
Session Continuity and Workspace
Continuity: Provider-agnostic transcript reconstruction from persisted events. Continuation modes (:auto, :replay, :native) with token-aware truncation keep conversations within budget across runs.
Workspace: Optional pre/post snapshots with git or hash backends. Computes diffs, captures patches as artifacts, and supports rollback on failure (git backend only).
Guides: Session Continuity | Workspace Snapshots
Routing, Policy, and Cost
Routing: ProviderRouter selects providers by capability matching, with health tracking, failover, weighted scoring, session stickiness, and circuit breaker.
Policy: Real-time budget and tool governance. Define token, duration, and cost limits with tool allow/deny rules. Violations trigger :cancel or :warn actions. Policies stack with deterministic merge.
Cost: Model-aware cost calculation using configurable pricing tables. Integrates with policy enforcement for budget limits.
{:ok, policy} = Policy.new(
name: "production",
limits: [{:max_total_tokens, 8_000}, {:max_cost_usd, 0.50}],
tool_rules: [{:deny, ["bash"]}],
on_violation: :cancel
)Guides: Provider Routing | Policy Enforcement | Cost Tracking
Concurrency, PubSub, and Integration
Concurrency: ConcurrencyLimiter enforces max parallel sessions/runs. SessionServer provides per-session FIFO queuing with multi-slot execution and durable subscriptions.
PubSub: Phoenix.PubSub integration for real-time event broadcasting to LiveView, WebSocket, or other subscribers.
Workflow Bridge: Thin integration layer for DAG/workflow engines with step execution, error classification (retry/failover/abort), and multi-run session lifecycle helpers.
Secrets Redaction: EventRedactor strips sensitive data from events before persistence.
Guides: Concurrency | PubSub | Workflow Bridge | Secrets Redaction | Shell Runner
Guides
Introduction
- Getting Started -- Installation, first session, core workflow
- Live Examples -- Running examples and validating contract surfaces
- Architecture -- Ports & adapters design, module map, data flow
- Configuration -- Layered config system, process-local overrides
- Configuration Reference -- Complete config key reference
- Model Configuration -- Provider-specific model selection
Core Concepts
- Sessions and Runs -- Lifecycle state machines, metadata, serialization
- Session Server Runtime -- FIFO queuing, multi-slot concurrency, drain/status
- Session Server Subscriptions -- Durable subscriptions with backfill and filtering
- Session Continuity -- Transcript reconstruction, continuation modes
- Events and Streaming -- Event types, normalization, EventStream cursor
- Rendering -- Renderer x Sink pipeline for event output
- PubSub Integration -- Real-time event broadcasting
- Stream Session -- One-shot streaming with lazy consumption
- Cursor Streaming and Migration -- Sequence numbers, cursor APIs
- Workspace Snapshots -- Snapshots, diffs, rollback, artifact storage
- Provider Routing -- Capability matching, failover, circuit breaker
- Policy Enforcement -- Limits, tool rules, violation actions
- Cost Tracking -- Model-aware pricing, budget enforcement
- Advanced Patterns -- Cross-feature integration patterns
- Capabilities -- Capability negotiation, manifests, registry
Persistence
- Persistence Overview -- Ports, adapters, event persistence flow
- Migrating to v0.8 -- Breaking changes and new patterns
- Ecto SessionStore -- PostgreSQL and SQLite via Ecto
- Ash SessionStore -- Ash resources and AshPostgres
- SQLite with Ecto -- Zero-dependency file-backed persistence
- S3 ArtifactStore -- S3-compatible object storage
- Composite Store -- Unified session + artifact backend
- Event Schema Versioning -- Schema evolution strategy
- Secrets Redaction -- Redact sensitive data before persistence
- Custom Persistence -- Implementing your own adapters
Integration
- Provider Adapters -- Claude, Codex, Amp setup and custom adapters
- Workflow Bridge -- DAG/workflow engine integration
- Shell Runner -- ShellAdapter for command execution
- Concurrency -- Limiter, control operations
- Telemetry and Observability -- Telemetry events, metrics
Reference
- Error Handling -- Error taxonomy, retryable errors
- Testing -- Mock adapters, in-memory store, test patterns
Production Checklist
A SessionStore is always required, but InMemorySessionStore works fine if you don't need data to survive restarts. Durable storage (Ecto, Ash, S3) is opt-in.
Required:
- [ ] Configure provider API keys (see Provider Adapters)
If you need durable storage:
- [ ] Add
EctoSessionStoreorAshSessionStore - [ ] Run schema migrations (
EctoSessionStore.Migration.up/0)
Recommended:
- [ ] Enable cost tracking with pricing tables
- [ ] Enable secrets redaction before persistence
- [ ] Define policies for token and cost limits
- [ ] Wire up telemetry handlers for metrics
- [ ] Use
SessionServerfor per-session queuing in long-running applications
Optional:
- [ ] S3 artifact storage for workspace patches
- [ ] PubSub integration for real-time fanout to LiveView/WebSocket
- [ ] Provider routing for multi-provider setups
- [ ] Workspace snapshots for tracking file changes
- [ ] Ash Framework for resource-oriented persistence
Error Handling
All operations return tagged tuples. Errors use a structured taxonomy with machine-readable codes:
case SessionManager.start_session(store, adapter, attrs) do
{:ok, session} -> session
{:error, %Error{code: :validation_error, message: msg}} ->
Logger.error("Validation failed: #{msg}")
endError codes are grouped into categories: validation, resource, provider, storage, runtime, concurrency, and tool errors. Some errors are marked retryable via Error.retryable?/1. See Error Handling.
Examples
The examples/ directory contains 40+ runnable scripts. Run them all:
bash examples/run_all.sh
# Or for a single provider:
bash examples/run_all.sh --provider claude
Getting started: oneshot.exs, live_session.exs, stream_session.exs, common_surface.exs
Provider-specific: claude_direct.exs, codex_direct.exs, amp_direct.exs
Features: session_continuity.exs, workspace_snapshot.exs, provider_routing.exs, policy_enforcement.exs, cost_tracking.exs, rendering_studio.exs, approval_gates.exs, secrets_redaction.exs
Persistence: persistence_live.exs, sqlite_session_store_live.exs, ecto_session_store_live.exs, ash_session_store.exs, composite_store_live.exs
Advanced: session_concurrency.exs, interactive_interrupt.exs, workflow_bridge.exs, shell_exec.exs, pubsub_sink.exs
See examples/README.md for full documentation.
Documentation
Full API documentation is available at HexDocs.
License
AgentSessionManager is released under the MIT License.