Event normalization pipeline for transforming provider events into normalized events.
This module provides adapter helpers to map provider-specific events into the canonical NormalizedEvent format, ensuring consistent handling across different AI providers.
Responsibilities
- Transform raw provider events into NormalizedEvent structs
- Assign sequence numbers for ordering
- Map provider-specific event types to canonical types
- Provide sorting and filtering utilities
Event Type Mapping
The normalizer maps common provider event patterns to canonical types:
- User messages -> :message_sent
- Assistant messages -> :message_received
- Streaming chunks -> :message_streamed
- Tool invocations -> :tool_call_started/:tool_call_completed/:tool_call_failed
- Run lifecycle -> :run_started/:run_completed/:run_failed
Usage
# Normalize a single event
{:ok, normalized} = EventNormalizer.normalize(raw_event, context)
# Normalize a batch with automatic sequencing
{:ok, events} = EventNormalizer.normalize_batch(raw_events, context)
# Sort events deterministically
sorted = EventNormalizer.sort_events(events)
Summary
Functions
Filters events by run_id.
Filters events by session_id.
Filters events by type or list of types.
Normalizes a raw event map into a NormalizedEvent.
Normalizes a batch of raw events with automatic sequence numbering.
Resolves a raw event type (string or atom) to a canonical event atom type.
Canonical event names are the values in Event.event_types/0.
Sorts events in deterministic order.
Functions
@spec filter_by_run([AgentSessionManager.Core.NormalizedEvent.t()], String.t()) :: [ AgentSessionManager.Core.NormalizedEvent.t() ]
Filters events by run_id.
@spec filter_by_session([AgentSessionManager.Core.NormalizedEvent.t()], String.t()) :: [ AgentSessionManager.Core.NormalizedEvent.t() ]
Filters events by session_id.
@spec filter_by_type( [AgentSessionManager.Core.NormalizedEvent.t()], atom() | [atom()] ) :: [ AgentSessionManager.Core.NormalizedEvent.t() ]
Filters events by type or list of types.
@spec normalize(map(), map()) :: {:ok, AgentSessionManager.Core.NormalizedEvent.t()} | {:error, AgentSessionManager.Core.Error.t()}
Normalizes a raw event map into a NormalizedEvent.
Parameters
raw_event- The raw event data from a providercontext- Context map with required:session_id,:run_id, and optional:provider
Examples
iex> EventNormalizer.normalize(%{"type" => "message_received"}, %{session_id: "s1", run_id: "r1"})
{:ok, %NormalizedEvent{type: :message_received, ...}}
@spec normalize_batch([map()], map()) :: {:ok, [AgentSessionManager.Core.NormalizedEvent.t()]} | {:error, %{errors: list(), successful: list()}}
Normalizes a batch of raw events with automatic sequence numbering.
Events are processed in order and assigned sequential sequence numbers starting from the offset (default 0).
Returns {:ok, events} if all succeed, or {:error, %{errors: [...], successful: [...]}}
if any fail.
Options in context
:sequence_offset- Starting sequence number (default: 0)
Resolves a raw event type (string or atom) to a canonical event atom type.
Canonical event names are the values in Event.event_types/0.
Returns :error_occurred for unrecognized or unsupported types.
Examples
iex> EventNormalizer.resolve_type("run_started")
:run_started
iex> EventNormalizer.resolve_type(:run_started)
:run_started
iex> EventNormalizer.resolve_type("unknown_type")
:error_occurred
@spec sort_events([AgentSessionManager.Core.NormalizedEvent.t()]) :: [ AgentSessionManager.Core.NormalizedEvent.t() ]
Sorts events in deterministic order.
Sorting priority:
sequence_number(ascending, nil values last)timestamp(ascending)id(lexicographic, as tiebreaker)
This ensures stable, reproducible ordering regardless of input order.