AgentSessionManager.Core.EventNormalizer (AgentSessionManager v0.8.0)

Copy Markdown View Source

Event normalization pipeline for transforming provider events into normalized events.

This module provides adapter helpers to map provider-specific events into the canonical NormalizedEvent format, ensuring consistent handling across different AI providers.

Responsibilities

  • Transform raw provider events into NormalizedEvent structs
  • Assign sequence numbers for ordering
  • Map provider-specific event types to canonical types
  • Provide sorting and filtering utilities

Event Type Mapping

The normalizer maps common provider event patterns to canonical types:

  • User messages -> :message_sent
  • Assistant messages -> :message_received
  • Streaming chunks -> :message_streamed
  • Tool invocations -> :tool_call_started/:tool_call_completed/:tool_call_failed
  • Run lifecycle -> :run_started/:run_completed/:run_failed

Usage

# Normalize a single event
{:ok, normalized} = EventNormalizer.normalize(raw_event, context)

# Normalize a batch with automatic sequencing
{:ok, events} = EventNormalizer.normalize_batch(raw_events, context)

# Sort events deterministically
sorted = EventNormalizer.sort_events(events)

Summary

Functions

Filters events by run_id.

Filters events by session_id.

Filters events by type or list of types.

Normalizes a raw event map into a NormalizedEvent.

Normalizes a batch of raw events with automatic sequence numbering.

Resolves a raw event type (string or atom) to a canonical event atom type. Canonical event names are the values in Event.event_types/0.

Sorts events in deterministic order.

Functions

filter_by_run(events, run_id)

Filters events by run_id.

filter_by_session(events, session_id)

Filters events by session_id.

filter_by_type(events, type)

Filters events by type or list of types.

normalize(raw_event, context)

Normalizes a raw event map into a NormalizedEvent.

Parameters

  • raw_event - The raw event data from a provider
  • context - Context map with required :session_id, :run_id, and optional :provider

Examples

iex> EventNormalizer.normalize(%{"type" => "message_received"}, %{session_id: "s1", run_id: "r1"})
{:ok, %NormalizedEvent{type: :message_received, ...}}

normalize_batch(raw_events, context)

@spec normalize_batch([map()], map()) ::
  {:ok, [AgentSessionManager.Core.NormalizedEvent.t()]}
  | {:error, %{errors: list(), successful: list()}}

Normalizes a batch of raw events with automatic sequence numbering.

Events are processed in order and assigned sequential sequence numbers starting from the offset (default 0).

Returns {:ok, events} if all succeed, or {:error, %{errors: [...], successful: [...]}} if any fail.

Options in context

  • :sequence_offset - Starting sequence number (default: 0)

resolve_type(type)

@spec resolve_type(atom() | String.t() | term()) :: atom()

Resolves a raw event type (string or atom) to a canonical event atom type. Canonical event names are the values in Event.event_types/0.

Returns :error_occurred for unrecognized or unsupported types.

Examples

iex> EventNormalizer.resolve_type("run_started")
:run_started

iex> EventNormalizer.resolve_type(:run_started)
:run_started

iex> EventNormalizer.resolve_type("unknown_type")
:error_occurred

sort_events(events)

Sorts events in deterministic order.

Sorting priority:

  1. sequence_number (ascending, nil values last)
  2. timestamp (ascending)
  3. id (lexicographic, as tiebreaker)

This ensures stable, reproducible ordering regardless of input order.