AgentSessionManager.Persistence.EventPipeline
(AgentSessionManager v0.8.0)
Copy Markdown
View Source
Validates, enriches, and persists events from provider adapters.
Sits between the adapter event callback and the SessionStore, ensuring all persisted events have consistent structure, provider identity, and validated data shapes.
Processing Steps
- Build — Normalize event type and construct an
Eventstruct - Enrich — Set
providerandcorrelation_idfrom pipeline context - Validate — Structural validation (strict) and shape validation (warnings)
- Persist — Append event with atomic sequence assignment
Usage
context = %{
session_id: "ses_123",
run_id: "run_456",
provider: "claude"
}
{:ok, event} = EventPipeline.process(store, raw_event_data, context)
Summary
Types
Functions
@spec process(AgentSessionManager.Ports.SessionStore.store(), map(), context()) :: {:ok, AgentSessionManager.Core.Event.t()} | {:error, AgentSessionManager.Core.Error.t()}
Process a raw event from a provider adapter.
Normalizes the event type, builds an Event struct with enriched fields, validates the data shape, and persists with atomic sequence assignment.
Returns the persisted event with sequence_number populated.
@spec process_batch( AgentSessionManager.Ports.SessionStore.store(), [map()], context() ) :: {:ok, [AgentSessionManager.Core.Event.t()]} | {:error, AgentSessionManager.Core.Error.t()}
Process a batch of events.
All events are validated first. If any fail structural validation, the entire batch is rejected. Shape warnings are attached but do not reject.