Events and Streaming

Copy Markdown View Source

Events are the foundation of observability in AgentSessionManager. Every significant action -- session creation, message streaming, tool calls, errors -- produces an immutable event that is persisted and can be consumed in real time.

Event Types

Events are grouped into categories:

Session Lifecycle

TypeWhen
:session_createdA new session was created
:session_startedSession transitioned to active
:session_pausedSession was paused
:session_resumedSession resumed from pause
:session_completedSession completed successfully
:session_failedSession failed with error
:session_cancelledSession was cancelled

Run Lifecycle

TypeWhen
:run_startedA run began execution
:run_completedA run completed successfully
:run_failedA run failed with error
:run_cancelledA run was cancelled
:run_timeoutA run timed out

Messages

TypeWhen
:message_sentA message was sent to the agent
:message_receivedA complete message was received from the agent
:message_streamedA streaming chunk was received

Tool Calls

TypeWhen
:tool_call_startedA tool call began
:tool_call_completedA tool call completed
:tool_call_failedA tool call failed

Errors and Usage

TypeWhen
:error_occurredAn error happened during execution
:error_recoveredRecovery from an error
:token_usage_updatedToken counts were updated
:turn_completedA conversation turn completed

Workspace

TypeWhen
:workspace_snapshot_takenA workspace snapshot was captured
:workspace_diff_computedA workspace diff summary was computed

Error Payloads

Error events (:error_occurred, :run_failed, :session_failed) keep error_message for backward compatibility and may include a structured provider_error payload:

%{
  provider: :codex | :amp | :claude | :gemini | :unknown,
  kind: atom(),
  message: String.t(),
  exit_code: integer() | nil,
  stderr: String.t() | nil,
  truncated?: boolean() | nil
}

Notes:

  • provider_error.message is a short provider summary.
  • provider_error.stderr carries diagnostic text when available.
  • provider_error.stderr is truncated before emission/persistence using :error_text_max_bytes and :error_text_max_lines.
  • Provider-specific extra fields are carried in event.data.details.

Creating Events

alias AgentSessionManager.Core.Event

{:ok, event} = Event.new(%{
  type: :message_received,
  session_id: session.id,
  run_id: run.id,
  data: %{content: "Hello!", role: "assistant"},
  metadata: %{model: "claude-haiku-4-5-20251001"}
})

event.id         # => "evt_a1b2c3..."  (auto-generated)
event.timestamp  # => ~U[2025-01-27 12:00:00Z]

Events are immutable -- once created, they cannot be modified.

Persisting Events

Events are stored via the SessionStore port using append-only semantics:

alias AgentSessionManager.Ports.SessionStore

{:ok, stored_event} = SessionStore.append_event_with_sequence(store, event)
stored_event.sequence_number
# => 1

Appending is idempotent: storing the same event ID twice doesn't create a duplicate. Sequence assignment is also idempotent: duplicate event IDs return the originally stored sequence.

Querying Events

# All events for a session
{:ok, events} = SessionStore.get_events(store, session.id)

# Filter by run
{:ok, events} = SessionStore.get_events(store, session.id, run_id: run.id)

# Filter by type
{:ok, events} = SessionStore.get_events(store, session.id, type: :message_received)

# Filter by time
{:ok, events} = SessionStore.get_events(store, session.id, since: one_hour_ago)

# Cursor filters
{:ok, events} = SessionStore.get_events(store, session.id, after: 100, limit: 50)
{:ok, events} = SessionStore.get_events(store, session.id, before: 200)

# Combine filters
{:ok, events} = SessionStore.get_events(store, session.id,
  after: 100,
  before: 200,
  run_id: run.id,
  type: :tool_call_completed,
  limit: 10
)

Events are always returned in append order (oldest first).

get_latest_sequence/2 returns the latest cursor for a session:

{:ok, cursor} = SessionStore.get_latest_sequence(store, session.id)

Event Normalization

When working with raw provider events (e.g., from webhook payloads), the EventNormalizer transforms them into the canonical NormalizedEvent format:

alias AgentSessionManager.Core.EventNormalizer

raw_event = %{
  "type" => "assistant_message",
  "content" => "Hello!",
  "model" => "claude-haiku-4-5-20251001"
}

context = %{
  session_id: session.id,
  run_id: run.id,
  provider: :anthropic
}

{:ok, normalized} = EventNormalizer.normalize(raw_event, context)
normalized.type  # => :message_received  (mapped from "assistant_message")

The normalizer handles type mapping, sequence numbering, and metadata extraction. It maps common provider patterns to canonical types:

  • "assistant_message", "ai_message", "assistant" -> :message_received
  • "delta", "content_block_delta", "stream" -> :message_streamed
  • "tool_use", "tool_call", "function_call" -> :tool_call_started
  • "error", "exception" -> :error_occurred

SessionManager Ingest Normalization

SessionManager.execute_run/4 applies event-type normalization on adapter callback events before persistence. This means alias event types such as "run_start", "run_end", and "delta" are stored as canonical Event.type atoms (:run_started, :run_completed, :message_streamed).

Batch Normalization

{:ok, events} = EventNormalizer.normalize_batch(raw_events, context)

Events in a batch get sequential sequence_number values automatically.

Sorting and Filtering

sorted = EventNormalizer.sort_events(events)           # by sequence, then timestamp, then id
filtered = EventNormalizer.filter_by_type(events, :message_streamed)
filtered = EventNormalizer.filter_by_run(events, run.id)

EventStream: Cursor-Based Consumption

The EventStream module provides a cursor-based mechanism for consuming events incrementally -- useful for building streaming UIs or processing events in order.

alias AgentSessionManager.Core.EventStream

# Create a stream for a specific run
{:ok, stream} = EventStream.new(%{session_id: session.id, run_id: run.id})

# Push events as they arrive
{:ok, stream} = EventStream.push(stream, event1)
{:ok, stream} = EventStream.push(stream, event2)
{:ok, stream} = EventStream.push_batch(stream, [event3, event4])

# Peek at events without advancing the cursor
events = EventStream.peek(stream, 3)

# Take events and advance the cursor
{:ok, events, stream} = EventStream.take(stream, 2)

# Check counts
EventStream.count(stream)      # total events in buffer
EventStream.remaining(stream)  # unread events from cursor

# Get all events (sorted by sequence number)
all = EventStream.get_events(stream)
all = EventStream.get_events(stream, from_cursor: true, type: :message_streamed)

Buffer Management

EventStream has a configurable buffer size (default: 1000). When the buffer is full, older events are evicted to make room:

{:ok, stream} = EventStream.new(%{
  session_id: session.id,
  run_id: run.id,
  buffer_size: 500
})

Context Validation

Events pushed to a stream must match its session_id and run_id. Mismatched events are rejected:

{:error, %Error{code: :session_mismatch}} =
  EventStream.push(stream, event_from_different_session)

Closing a Stream

{:ok, stream} = EventStream.close(stream)
EventStream.closed?(stream)  # => true

# Closed streams reject new events
{:error, _} = EventStream.push(stream, event)

Real-Time Event Handling

When executing runs through an adapter, you provide an event callback to handle events as they arrive:

callback = fn event ->
  case event.type do
    :message_streamed ->
      # Stream content to the user in real time
      IO.write(event.data.delta)

    :tool_call_started ->
      Logger.info("Tool called: #{event.data.tool_name}")

    :token_usage_updated ->
      Logger.debug("Tokens: #{event.data.input_tokens} in, #{event.data.output_tokens} out")

    :run_completed ->
      IO.puts("\n[Complete]")

    :error_occurred ->
      Logger.error("Error: #{event.data.error_message}")

      if is_map(event.data[:provider_error]) do
        Logger.error("Provider stderr: #{event.data.provider_error.stderr || "n/a"}")
      end

    _ ->
      :ok
  end
end

{:ok, result} = adapter_module.execute(adapter, run, session, event_callback: callback)

The adapter normalizes provider-specific events before invoking your callback, so you always work with the canonical event types regardless of the provider.

Durable Follow/Poll Streaming

For resumable polling from persisted events, use SessionManager.stream_session_events/3:

alias AgentSessionManager.SessionManager

# Polling mode (default) — sleeps between empty reads
stream =
  SessionManager.stream_session_events(store, session.id,
    after: 0,
    limit: 100,
    poll_interval_ms: 250
  )

# Consume incrementally
Enum.take(stream, 10)

This stream is cursor-backed by SessionStore.get_events/3 and works across reconnects.

Long-Poll Mode (wait_timeout_ms)

To avoid busy polling, pass wait_timeout_ms. When the store supports it (e.g. InMemorySessionStore), the store blocks the read until matching events arrive or the timeout elapses:

stream =
  SessionManager.stream_session_events(store, session.id,
    after: cursor,
    limit: 100,
    wait_timeout_ms: 5_000
  )

Enum.take(stream, 10)

This is ideal for real-time streaming UIs (SSE, WebSocket, LiveView) where you need low latency without wasting CPU cycles on empty polls. Stores that do not support wait_timeout_ms ignore it and fall back to immediate return.

Adapter Event Metadata

When SessionManager persists adapter events, it preserves:

  • Adapter timestamps: If the adapter emits a DateTime timestamp, it is stored in Event.timestamp (instead of the default DateTime.utc_now()).
  • Adapter metadata: If the adapter emits a metadata map, it is merged into Event.metadata.
  • Provider identity: Event.metadata[:provider] is always set to the adapter's provider name (e.g. "claude", "codex", "amp").
{:ok, events} = SessionStore.get_events(store, session.id, run_id: run.id)
event = Enum.find(events, &(&1.type == :run_started))

event.metadata[:provider]       # => "claude"
event.metadata[:provider_event_id]  # => adapter-specific ID (if provided)
event.timestamp                 # => adapter-provided timestamp