AgentSessionManager.Persistence.EventPipeline (AgentSessionManager v0.8.0)

Copy Markdown View Source

Validates, enriches, and persists events from provider adapters.

Sits between the adapter event callback and the SessionStore, ensuring all persisted events have consistent structure, provider identity, and validated data shapes.

Processing Steps

  1. Build — Normalize event type and construct an Event struct
  2. Enrich — Set provider and correlation_id from pipeline context
  3. Validate — Structural validation (strict) and shape validation (warnings)
  4. Persist — Append event with atomic sequence assignment

Usage

context = %{
  session_id: "ses_123",
  run_id: "run_456",
  provider: "claude"
}

{:ok, event} = EventPipeline.process(store, raw_event_data, context)

Summary

Functions

Process a raw event from a provider adapter.

Process a batch of events.

Types

context()

@type context() :: %{
  :session_id => String.t(),
  :run_id => String.t(),
  :provider => String.t(),
  optional(:correlation_id) => String.t()
}

Functions

process(store, raw_event_data, context)

Process a raw event from a provider adapter.

Normalizes the event type, builds an Event struct with enriched fields, validates the data shape, and persists with atomic sequence assignment.

Returns the persisted event with sequence_number populated.

process_batch(store, raw_events, context)

Process a batch of events.

All events are validated first. If any fail structural validation, the entire batch is rejected. Shape warnings are attached but do not reject.