Events are the foundation of observability in AgentSessionManager. Every significant action -- session creation, message streaming, tool calls, errors -- produces an immutable event that is persisted and can be consumed in real time.
Event Types
Events are grouped into categories:
Session Lifecycle
| Type | When |
|---|---|
:session_created | A new session was created |
:session_started | Session transitioned to active |
:session_paused | Session was paused |
:session_resumed | Session resumed from pause |
:session_completed | Session completed successfully |
:session_failed | Session failed with error |
:session_cancelled | Session was cancelled |
Run Lifecycle
| Type | When |
|---|---|
:run_started | A run began execution |
:run_completed | A run completed successfully |
:run_failed | A run failed with error |
:run_cancelled | A run was cancelled |
:run_timeout | A run timed out |
Messages
| Type | When |
|---|---|
:message_sent | A message was sent to the agent |
:message_received | A complete message was received from the agent |
:message_streamed | A streaming chunk was received |
Tool Calls
| Type | When |
|---|---|
:tool_call_started | A tool call began |
:tool_call_completed | A tool call completed |
:tool_call_failed | A tool call failed |
Errors and Usage
| Type | When |
|---|---|
:error_occurred | An error happened during execution |
:error_recovered | Recovery from an error |
:token_usage_updated | Token counts were updated |
:turn_completed | A conversation turn completed |
Workspace
| Type | When |
|---|---|
:workspace_snapshot_taken | A workspace snapshot was captured |
:workspace_diff_computed | A workspace diff summary was computed |
Error Payloads
Error events (:error_occurred, :run_failed, :session_failed) keep
error_message for backward compatibility and may include a structured
provider_error payload:
%{
provider: :codex | :amp | :claude | :gemini | :unknown,
kind: atom(),
message: String.t(),
exit_code: integer() | nil,
stderr: String.t() | nil,
truncated?: boolean() | nil
}Notes:
provider_error.messageis a short provider summary.provider_error.stderrcarries diagnostic text when available.provider_error.stderris truncated before emission/persistence using:error_text_max_bytesand:error_text_max_lines.- Provider-specific extra fields are carried in
event.data.details.
Creating Events
alias AgentSessionManager.Core.Event
{:ok, event} = Event.new(%{
type: :message_received,
session_id: session.id,
run_id: run.id,
data: %{content: "Hello!", role: "assistant"},
metadata: %{model: "claude-haiku-4-5-20251001"}
})
event.id # => "evt_a1b2c3..." (auto-generated)
event.timestamp # => ~U[2025-01-27 12:00:00Z]Events are immutable -- once created, they cannot be modified.
Persisting Events
Events are stored via the SessionStore port using append-only semantics:
alias AgentSessionManager.Ports.SessionStore
{:ok, stored_event} = SessionStore.append_event_with_sequence(store, event)
stored_event.sequence_number
# => 1Appending is idempotent: storing the same event ID twice doesn't create a duplicate. Sequence assignment is also idempotent: duplicate event IDs return the originally stored sequence.
Querying Events
# All events for a session
{:ok, events} = SessionStore.get_events(store, session.id)
# Filter by run
{:ok, events} = SessionStore.get_events(store, session.id, run_id: run.id)
# Filter by type
{:ok, events} = SessionStore.get_events(store, session.id, type: :message_received)
# Filter by time
{:ok, events} = SessionStore.get_events(store, session.id, since: one_hour_ago)
# Cursor filters
{:ok, events} = SessionStore.get_events(store, session.id, after: 100, limit: 50)
{:ok, events} = SessionStore.get_events(store, session.id, before: 200)
# Combine filters
{:ok, events} = SessionStore.get_events(store, session.id,
after: 100,
before: 200,
run_id: run.id,
type: :tool_call_completed,
limit: 10
)Events are always returned in append order (oldest first).
get_latest_sequence/2 returns the latest cursor for a session:
{:ok, cursor} = SessionStore.get_latest_sequence(store, session.id)Event Normalization
When working with raw provider events (e.g., from webhook payloads), the EventNormalizer transforms them into the canonical NormalizedEvent format:
alias AgentSessionManager.Core.EventNormalizer
raw_event = %{
"type" => "assistant_message",
"content" => "Hello!",
"model" => "claude-haiku-4-5-20251001"
}
context = %{
session_id: session.id,
run_id: run.id,
provider: :anthropic
}
{:ok, normalized} = EventNormalizer.normalize(raw_event, context)
normalized.type # => :message_received (mapped from "assistant_message")The normalizer handles type mapping, sequence numbering, and metadata extraction. It maps common provider patterns to canonical types:
"assistant_message","ai_message","assistant"->:message_received"delta","content_block_delta","stream"->:message_streamed"tool_use","tool_call","function_call"->:tool_call_started"error","exception"->:error_occurred
SessionManager Ingest Normalization
SessionManager.execute_run/4 applies event-type normalization on adapter callback events before persistence.
This means alias event types such as "run_start", "run_end", and "delta" are stored as canonical
Event.type atoms (:run_started, :run_completed, :message_streamed).
Batch Normalization
{:ok, events} = EventNormalizer.normalize_batch(raw_events, context)Events in a batch get sequential sequence_number values automatically.
Sorting and Filtering
sorted = EventNormalizer.sort_events(events) # by sequence, then timestamp, then id
filtered = EventNormalizer.filter_by_type(events, :message_streamed)
filtered = EventNormalizer.filter_by_run(events, run.id)EventStream: Cursor-Based Consumption
The EventStream module provides a cursor-based mechanism for consuming events incrementally -- useful for building streaming UIs or processing events in order.
alias AgentSessionManager.Core.EventStream
# Create a stream for a specific run
{:ok, stream} = EventStream.new(%{session_id: session.id, run_id: run.id})
# Push events as they arrive
{:ok, stream} = EventStream.push(stream, event1)
{:ok, stream} = EventStream.push(stream, event2)
{:ok, stream} = EventStream.push_batch(stream, [event3, event4])
# Peek at events without advancing the cursor
events = EventStream.peek(stream, 3)
# Take events and advance the cursor
{:ok, events, stream} = EventStream.take(stream, 2)
# Check counts
EventStream.count(stream) # total events in buffer
EventStream.remaining(stream) # unread events from cursor
# Get all events (sorted by sequence number)
all = EventStream.get_events(stream)
all = EventStream.get_events(stream, from_cursor: true, type: :message_streamed)Buffer Management
EventStream has a configurable buffer size (default: 1000). When the buffer is full, older events are evicted to make room:
{:ok, stream} = EventStream.new(%{
session_id: session.id,
run_id: run.id,
buffer_size: 500
})Context Validation
Events pushed to a stream must match its session_id and run_id. Mismatched events are rejected:
{:error, %Error{code: :session_mismatch}} =
EventStream.push(stream, event_from_different_session)Closing a Stream
{:ok, stream} = EventStream.close(stream)
EventStream.closed?(stream) # => true
# Closed streams reject new events
{:error, _} = EventStream.push(stream, event)Real-Time Event Handling
When executing runs through an adapter, you provide an event callback to handle events as they arrive:
callback = fn event ->
case event.type do
:message_streamed ->
# Stream content to the user in real time
IO.write(event.data.delta)
:tool_call_started ->
Logger.info("Tool called: #{event.data.tool_name}")
:token_usage_updated ->
Logger.debug("Tokens: #{event.data.input_tokens} in, #{event.data.output_tokens} out")
:run_completed ->
IO.puts("\n[Complete]")
:error_occurred ->
Logger.error("Error: #{event.data.error_message}")
if is_map(event.data[:provider_error]) do
Logger.error("Provider stderr: #{event.data.provider_error.stderr || "n/a"}")
end
_ ->
:ok
end
end
{:ok, result} = adapter_module.execute(adapter, run, session, event_callback: callback)The adapter normalizes provider-specific events before invoking your callback, so you always work with the canonical event types regardless of the provider.
Durable Follow/Poll Streaming
For resumable polling from persisted events, use SessionManager.stream_session_events/3:
alias AgentSessionManager.SessionManager
# Polling mode (default) — sleeps between empty reads
stream =
SessionManager.stream_session_events(store, session.id,
after: 0,
limit: 100,
poll_interval_ms: 250
)
# Consume incrementally
Enum.take(stream, 10)This stream is cursor-backed by SessionStore.get_events/3 and works across reconnects.
Long-Poll Mode (wait_timeout_ms)
To avoid busy polling, pass wait_timeout_ms. When the store supports it
(e.g. InMemorySessionStore), the store blocks the read until matching events
arrive or the timeout elapses:
stream =
SessionManager.stream_session_events(store, session.id,
after: cursor,
limit: 100,
wait_timeout_ms: 5_000
)
Enum.take(stream, 10)This is ideal for real-time streaming UIs (SSE, WebSocket, LiveView) where
you need low latency without wasting CPU cycles on empty polls. Stores that
do not support wait_timeout_ms ignore it and fall back to immediate return.
Adapter Event Metadata
When SessionManager persists adapter events, it preserves:
- Adapter timestamps: If the adapter emits a
DateTimetimestamp, it is stored inEvent.timestamp(instead of the defaultDateTime.utc_now()). - Adapter metadata: If the adapter emits a
metadatamap, it is merged intoEvent.metadata. - Provider identity:
Event.metadata[:provider]is always set to the adapter's provider name (e.g."claude","codex","amp").
{:ok, events} = SessionStore.get_events(store, session.id, run_id: run.id)
event = Enum.find(events, &(&1.type == :run_started))
event.metadata[:provider] # => "claude"
event.metadata[:provider_event_id] # => adapter-specific ID (if provided)
event.timestamp # => adapter-provided timestamp