AgentSessionManager

AgentSessionManager

Unified Elixir infrastructure for orchestrating AI agent sessions across Claude, Codex, and Amp — with lifecycle management, event streaming, persistence, and provider-agnostic observability.

Hex.pm Documentation License


Why AgentSessionManager?

When you call an AI provider API directly, you get back a response. When you build a product on top of AI agents, you need much more: session state, event history, cost tracking, cancellation, persistence, and the ability to swap providers without rewriting your application.

AgentSessionManager is the infrastructure layer that gives you all of this through a clean ports-and-adapters architecture.

ConcernWithout ASMWith ASM
Session stateHand-roll per providerUnified lifecycle with state machines
Event formatsParse each provider separately20+ normalized event types
PersistenceBuild your ownPluggable: InMemory, Ecto (Postgres/SQLite), Ash, S3
Cost visibilityNoneModel-aware cost tracking per run
Provider lock-inRewrite when switchingSwap adapters, keep application code
ObservabilityAd-hoc loggingTelemetry, rendering pipeline, audit trail

See Architecture for the full design.

Quick Start

Add the dependency and at least one provider SDK:

def deps do
  [
    {:agent_session_manager, "~> 0.8.0"},
    {:claude_agent_sdk, "~> 0.14.0"}  # or {:codex_sdk, "~> 0.10.0"} / {:amp_sdk, "~> 0.4.0"}
  ]
end

Run a one-shot session:

alias AgentSessionManager.SessionManager
alias AgentSessionManager.Adapters.{ClaudeAdapter, InMemorySessionStore}

{:ok, store} = InMemorySessionStore.start_link()
{:ok, adapter} = ClaudeAdapter.start_link()

{:ok, result} =
  SessionManager.run_once(store, adapter,
    %{messages: [%{role: "user", content: "What is the BEAM?"}]},
    event_callback: fn
      %{type: :message_streamed, data: %{delta: text}} -> IO.write(text)
      _ -> :ok
    end
  )

IO.inspect(result.token_usage)

This creates a session, executes one run, streams events, and cleans up. For provider-specific setup, see Provider Adapters.

Choose Your Abstraction Level

Start with the simplest API that meets your needs:

APIUse when you needComplexity
SessionManager.run_once/4Single request/response, scripts, testingLowest
StreamSession.start/1Lazy event stream from a one-shot callLow
SessionManager full lifecycleMulti-run sessions, explicit state controlMedium
SessionServerPer-session queuing, concurrent runs, subscriptionsHighest

StreamSession

Collapses boilerplate into a single call that returns a lazy stream:

{:ok, stream, close, _meta} =
  StreamSession.start(
    adapter: {ClaudeAdapter, []},
    input: %{messages: [%{role: "user", content: "Hello!"}]}
  )

stream |> Stream.each(&IO.inspect/1) |> Stream.run()
close.()

See StreamSession.

Full Lifecycle

Explicit control over session creation, activation, runs, and completion:

{:ok, session} = SessionManager.start_session(store, adapter, %{agent_id: "my-agent"})
{:ok, session} = SessionManager.activate_session(store, session.id)
{:ok, run} = SessionManager.start_run(store, adapter, session.id, %{messages: messages})
{:ok, result} = SessionManager.execute_run(store, adapter, run.id)
{:ok, _} = SessionManager.complete_session(store, session.id)

See Sessions and Runs.

SessionServer

Per-session GenServer with FIFO queuing and multi-slot concurrency:

{:ok, server} =
  SessionServer.start_link(
    store: store,
    adapter: adapter,
    session_opts: %{agent_id: "runtime-agent"},
    max_concurrent_runs: 2
  )

{:ok, run_id} = SessionServer.submit_run(server, %{messages: messages})
{:ok, result} = SessionServer.await_run(server, run_id, 120_000)
:ok = SessionServer.drain(server, 30_000)

See Session Server Runtime.

Core Concepts

Your Application
       |
  SessionManager         -- orchestrates lifecycle, events, capability checks
       |
  +----+----+
  |         |
Store    Adapter          -- ports (interfaces / behaviours)
  |         |
ETS/DB   Claude/Codex/Amp -- adapters (implementations)

Sessions are containers for a series of agent interactions. State machine: pending -> active -> completed / failed / cancelled.

Runs represent one execution within a session -- sending input and receiving output. State machine: pending -> running -> completed / failed / cancelled / timeout. Runs track token usage and output.

Events are immutable records emitted during execution. They provide an audit trail, power streaming, and enable cursor-based replay.

Ports

The core depends only on behaviours. Swap implementations without touching application code.

PortPurpose
ProviderAdapterAI provider integration (execute, cancel, capabilities)
SessionStoreSession, run, and event persistence
ArtifactStoreBinary artifact storage (patches, files)
QueryAPICross-session queries and analytics
MaintenanceRetention, cleanup, health checks

Providers

AdapterProviderStreamingTool UseCancel
ClaudeAdapterAnthropic ClaudeYesYesYes
CodexAdapterCodex CLIYesYesYes
AmpAdapterAmp (Sourcegraph)YesYesYes
ShellAdapterShell commandsYesNoYes

All adapters accept :permission_mode, :max_turns, :system_prompt, and :sdk_opts for provider-specific passthrough. For multi-provider setups, ProviderRouter acts as a meta-adapter with capability-based selection, failover, and circuit breaker. See Provider Routing.

Normalized Events

Each provider emits events in its own format. Adapters normalize them into a canonical set:

EventDescription
run_startedExecution began
message_streamedStreaming content chunk
message_receivedComplete message ready
tool_call_startedTool invocation begins
tool_call_completedTool finished
token_usage_updatedUsage stats updated
run_completedExecution finished successfully
run_failedExecution failed
run_cancelledExecution cancelled
policy_violationPolicy limit exceeded

Failure events keep backward-compatible error_message and may also include provider_error:

%{
  provider: :codex | :amp | :claude | :gemini | :unknown,
  kind: atom(),
  message: String.t(),
  exit_code: integer() | nil,
  stderr: String.t() | nil,
  truncated?: boolean() | nil
}

provider_error.stderr is truncated before emission/persistence using AgentSessionManager.Config keys :error_text_max_bytes and :error_text_max_lines. When persistence redaction is enabled, nested provider_error fields are also scanned.

See Events and Streaming for the full event type reference.

Features

Persistence and Storage

Pluggable storage backends for sessions, runs, events, and artifacts. InMemorySessionStore for development and testing; EctoSessionStore for PostgreSQL or SQLite in production; AshSessionStore as an Ash Framework alternative; S3ArtifactStore for large binary artifacts; CompositeSessionStore to unify session and artifact backends.

# Production setup with Ecto + SQLite
{:ok, store} = EctoSessionStore.start_link(repo: MyApp.Repo)

Guides: Persistence Overview | Ecto | SQLite | Ash | S3 | Custom

Event Streaming and Observability

Cursor-backed event streaming with monotonic per-session sequence numbers, durable pagination (after/before/limit), and optional long-poll support. The rendering pipeline separates formatting (renderers) from output (sinks) -- compose StudioRenderer, CompactRenderer, or VerboseRenderer with TTYSink, FileSink, JSONLSink, PubSubSink, or CallbackSink.

Rendering.stream(event_stream,
  renderer: {StudioRenderer, verbosity: :summary},
  sinks: [{TTYSink, []}, {JSONLSink, path: "events.jsonl"}]
)

Guides: Events and Streaming | Cursor Streaming | Rendering | Telemetry

Session Continuity and Workspace

Continuity: Provider-agnostic transcript reconstruction from persisted events. Continuation modes (:auto, :replay, :native) with token-aware truncation keep conversations within budget across runs.

Workspace: Optional pre/post snapshots with git or hash backends. Computes diffs, captures patches as artifacts, and supports rollback on failure (git backend only).

Guides: Session Continuity | Workspace Snapshots

Routing, Policy, and Cost

Routing: ProviderRouter selects providers by capability matching, with health tracking, failover, weighted scoring, session stickiness, and circuit breaker.

Policy: Real-time budget and tool governance. Define token, duration, and cost limits with tool allow/deny rules. Violations trigger :cancel or :warn actions. Policies stack with deterministic merge.

Cost: Model-aware cost calculation using configurable pricing tables. Integrates with policy enforcement for budget limits.

{:ok, policy} = Policy.new(
  name: "production",
  limits: [{:max_total_tokens, 8_000}, {:max_cost_usd, 0.50}],
  tool_rules: [{:deny, ["bash"]}],
  on_violation: :cancel
)

Guides: Provider Routing | Policy Enforcement | Cost Tracking

Concurrency, PubSub, and Integration

Concurrency: ConcurrencyLimiter enforces max parallel sessions/runs. SessionServer provides per-session FIFO queuing with multi-slot execution and durable subscriptions.

PubSub: Phoenix.PubSub integration for real-time event broadcasting to LiveView, WebSocket, or other subscribers.

Workflow Bridge: Thin integration layer for DAG/workflow engines with step execution, error classification (retry/failover/abort), and multi-run session lifecycle helpers.

Secrets Redaction: EventRedactor strips sensitive data from events before persistence.

Guides: Concurrency | PubSub | Workflow Bridge | Secrets Redaction | Shell Runner

Guides

Introduction

Core Concepts

Persistence

Integration

Reference

  • Error Handling -- Error taxonomy, retryable errors
  • Testing -- Mock adapters, in-memory store, test patterns

Production Checklist

A SessionStore is always required, but InMemorySessionStore works fine if you don't need data to survive restarts. Durable storage (Ecto, Ash, S3) is opt-in.

Required:

If you need durable storage:

Recommended:

Optional:

Error Handling

All operations return tagged tuples. Errors use a structured taxonomy with machine-readable codes:

case SessionManager.start_session(store, adapter, attrs) do
  {:ok, session} -> session
  {:error, %Error{code: :validation_error, message: msg}} ->
    Logger.error("Validation failed: #{msg}")
end

Error codes are grouped into categories: validation, resource, provider, storage, runtime, concurrency, and tool errors. Some errors are marked retryable via Error.retryable?/1. See Error Handling.

Examples

The examples/ directory contains 40+ runnable scripts. Run them all:

bash examples/run_all.sh
# Or for a single provider:
bash examples/run_all.sh --provider claude

Getting started: oneshot.exs, live_session.exs, stream_session.exs, common_surface.exs

Provider-specific: claude_direct.exs, codex_direct.exs, amp_direct.exs

Features: session_continuity.exs, workspace_snapshot.exs, provider_routing.exs, policy_enforcement.exs, cost_tracking.exs, rendering_studio.exs, approval_gates.exs, secrets_redaction.exs

Persistence: persistence_live.exs, sqlite_session_store_live.exs, ecto_session_store_live.exs, ash_session_store.exs, composite_store_live.exs

Advanced: session_concurrency.exs, interactive_interrupt.exs, workflow_bridge.exs, shell_exec.exs, pubsub_sink.exs

See examples/README.md for full documentation.

Documentation

Full API documentation is available at HexDocs.

License

AgentSessionManager is released under the MIT License.