AgentSessionManager.Core.EventStream (AgentSessionManager v0.8.0)

Copy Markdown View Source

Manages incremental consumption of normalized event streams.

EventStream provides a cursor-based mechanism for consuming events incrementally, supporting both batch and streaming consumption patterns.

Features

  • Cursor-based navigation for resumable consumption
  • Buffer management with configurable size limits
  • Context validation (session_id, run_id matching)
  • Enumerable support for functional operations

Usage

# Create a new stream
{:ok, stream} = EventStream.new(%{session_id: "ses_123", run_id: "run_456"})

# Push events
{:ok, stream} = EventStream.push(stream, event)
{:ok, stream} = EventStream.push_batch(stream, events)

# Consume events
events = EventStream.peek(stream, 5)      # Non-consuming read
{:ok, events, stream} = EventStream.take(stream, 5)  # Consuming read

# Get all events (sorted)
all_events = EventStream.get_events(stream)

# Close when done
{:ok, stream} = EventStream.close(stream)

Summary

Functions

Advances the cursor to the specified position.

Closes the stream, preventing further pushes.

Returns whether the stream is closed.

Returns the total number of events in the stream.

Returns all events in the stream, sorted by sequence number.

Creates a new event stream for the given context.

Returns the next count events without advancing the cursor.

Pushes a single event to the stream.

Pushes a batch of events to the stream atomically.

Returns the number of unread events (from cursor to end).

Returns the next count events and advances the cursor.

Returns an enumerable view of the stream's remaining events.

Types

status()

@type status() :: :open | :closed

t()

@type t() :: %AgentSessionManager.Core.EventStream{
  buffer_size: pos_integer(),
  cursor: non_neg_integer(),
  events: [AgentSessionManager.Core.NormalizedEvent.t()],
  next_sequence: non_neg_integer(),
  run_id: String.t(),
  session_id: String.t(),
  status: status()
}

Functions

advance_cursor(stream, position)

@spec advance_cursor(t(), non_neg_integer()) ::
  {:ok, t()} | {:error, AgentSessionManager.Core.Error.t()}

Advances the cursor to the specified position.

The cursor can only move forward, never backward.

close(stream)

@spec close(t()) :: {:ok, t()}

Closes the stream, preventing further pushes.

The cursor is advanced to the end position.

closed?(event_stream)

@spec closed?(t()) :: boolean()

Returns whether the stream is closed.

count(event_stream)

@spec count(t()) :: non_neg_integer()

Returns the total number of events in the stream.

get_events(stream, opts \\ [])

@spec get_events(
  t(),
  keyword()
) :: [AgentSessionManager.Core.NormalizedEvent.t()]

Returns all events in the stream, sorted by sequence number.

Options

  • :from_cursor - If true, only return events from cursor position onwards
  • :limit - Maximum number of events to return
  • :type - Filter by event type (atom or list of atoms)

new(context)

@spec new(map()) :: {:ok, t()} | {:error, AgentSessionManager.Core.Error.t()}

Creates a new event stream for the given context.

Required

  • :session_id - The session ID for this stream
  • :run_id - The run ID for this stream

Optional

  • :buffer_size - Maximum number of events to keep in memory (default: 1000)

peek(stream, count)

Returns the next count events without advancing the cursor.

push(stream, event)

Pushes a single event to the stream.

The event must have matching session_id and run_id. If the event doesn't have a sequence_number, one will be assigned automatically.

push_batch(stream, events)

@spec push_batch(t(), [AgentSessionManager.Core.NormalizedEvent.t()]) ::
  {:ok, t()} | {:error, AgentSessionManager.Core.Error.t()}

Pushes a batch of events to the stream atomically.

All events must have matching session_id and run_id. If any event fails validation, the entire batch is rejected.

remaining(stream)

@spec remaining(t()) :: non_neg_integer()

Returns the number of unread events (from cursor to end).

take(stream, count)

Returns the next count events and advances the cursor.

Returns {:ok, events, updated_stream}.

to_enumerable(stream)

@spec to_enumerable(t()) :: Enumerable.t()

Returns an enumerable view of the stream's remaining events.

This allows using Enum functions on the stream's events.