AgentSessionManager.Ports.SessionStore behaviour (AgentSessionManager v0.8.0)

Copy Markdown View Source

Port (interface) for session storage operations.

This behaviour defines the contract that all session store implementations must fulfill. It follows the ports and adapters pattern, allowing different storage backends (in-memory, PostgreSQL, Redis, etc.) to be swapped without changing the core business logic.

Design Principles

  • Append-only event log semantics: Events are immutable once stored
  • Idempotent writes: Saving the same entity multiple times is safe
  • Read-after-write consistency: Active run queries reflect latest writes
  • Concurrent access safety: All operations must be thread-safe

Implementation Requirements

Implementations must:

  1. Handle concurrent access without race conditions
  2. Provide idempotent write operations (save_session, save_run)
  3. Maintain event append order
  4. Deduplicate events by ID
  5. Return proper error tuples for not-found cases

Usage

The SessionStore is typically accessed through a store instance (e.g., a GenServer pid or an Agent reference):

# Using the behaviour directly with a store instance
{:ok, store} = InMemorySessionStore.start_link([])
SessionStore.save_session(store, session)
{:ok, session} = SessionStore.get_session(store, session_id)

Summary

Callbacks

Appends an event to the event log.

Appends an event and atomically assigns a per-session sequence number.

Appends a batch of events with atomic, per-session sequence assignment.

Deletes a session by its ID.

Atomically persists an execution result (session, run, events).

Gets the currently active (running) run for a session.

Retrieves events for a session with optional filtering.

Gets the latest assigned event sequence number for a session.

Retrieves a run by its ID.

Retrieves a session by its ID.

Lists all runs for a given session.

Lists sessions with optional filtering.

Saves a run to the store.

Saves a session to the store.

Functions

Appends an event to the store.

Appends an event and returns the persisted event with assigned sequence_number.

Appends a batch of events and returns persisted events with assigned sequence numbers.

Deletes a session by ID.

Atomically persists an execution result payload.

Gets the active run for a session.

Gets events for a session with optional filtering.

Gets the highest assigned sequence number for the session, or 0 when empty.

Retrieves a run by ID.

Retrieves a session by ID.

Lists runs for a session.

Lists sessions with optional filtering.

Saves a run to the store.

Saves a session to the store.

Types

context()

@type context() :: term()

execution_result()

@type execution_result() :: %{
  session: AgentSessionManager.Core.Session.t(),
  run: AgentSessionManager.Core.Run.t(),
  events: [AgentSessionManager.Core.Event.t()],
  provider_metadata: map()
}

filter_opts()

@type filter_opts() :: keyword()

run_id()

@type run_id() :: String.t()

session_id()

@type session_id() :: String.t()

store()

@type store() :: {module(), context()} | GenServer.server() | pid() | atom()

Callbacks

append_event(store, t)

@callback append_event(store(), AgentSessionManager.Core.Event.t()) ::
  :ok | {:error, AgentSessionManager.Core.Error.t()}

Appends an event to the event log.

Events are immutable - once appended, they cannot be modified or deleted. This operation is idempotent - appending an event with the same ID multiple times will not create duplicates.

Events must be stored in append order and returned in that same order by get_events/3.

Parameters

  • store - The store instance
  • event - The event struct to append

Returns

  • :ok on success
  • {:error, Error.t()} on failure

Examples

{:ok, event} = Event.new(%{type: :session_created, session_id: session.id})
:ok = SessionStore.append_event(store, event)

append_event_with_sequence(store, t)

@callback append_event_with_sequence(store(), AgentSessionManager.Core.Event.t()) ::
  {:ok, AgentSessionManager.Core.Event.t()}
  | {:error, AgentSessionManager.Core.Error.t()}

Appends an event and atomically assigns a per-session sequence number.

The returned event must include a non-nil sequence_number. Duplicate event IDs must be handled idempotently and return the originally persisted event.

Parameters

  • store - The store instance
  • event - The event struct to append

Returns

  • {:ok, Event.t()} on success
  • {:error, Error.t()} on failure

append_events(store, list)

Appends a batch of events with atomic, per-session sequence assignment.

Implementations should preserve input order for returned events and apply idempotent semantics for duplicate event IDs.

delete_session(store, session_id)

@callback delete_session(store(), session_id()) :: :ok

Deletes a session by its ID.

This operation is idempotent - deleting a non-existent session returns :ok.

Parameters

  • store - The store instance
  • session_id - The session's unique identifier

Returns

  • :ok on success (including when session doesn't exist)

Examples

:ok = SessionStore.delete_session(store, "ses_abc123")

flush(store, execution_result)

@callback flush(store(), execution_result()) ::
  :ok | {:error, AgentSessionManager.Core.Error.t()}

Atomically persists an execution result (session, run, events).

On failure, no partial writes should be visible.

get_active_run(store, session_id)

@callback get_active_run(store(), session_id()) ::
  {:ok, AgentSessionManager.Core.Run.t() | nil}

Gets the currently active (running) run for a session.

A run is considered active if its status is :running.

This operation must provide read-after-write consistency - if a run was just saved with status :running, this function must return it immediately.

Parameters

  • store - The store instance
  • session_id - The session's unique identifier

Returns

  • {:ok, Run.t()} if there's an active run
  • {:ok, nil} if there's no active run

Examples

{:ok, active_run} = SessionStore.get_active_run(store, session.id)

get_events(store, session_id, filter_opts)

@callback get_events(store(), session_id(), filter_opts()) ::
  {:ok, [AgentSessionManager.Core.Event.t()]}

Retrieves events for a session with optional filtering.

Events are returned in append order (oldest first).

Parameters

  • store - The store instance
  • session_id - The session's unique identifier
  • opts - Optional filter options:
    • :run_id - Filter by run ID
    • :type - Filter by event type
    • :since - Events after this timestamp
    • :after - Events with sequence number strictly greater than this value
    • :before - Events with sequence number strictly less than this value
    • :limit - Maximum number of results
    • :wait_timeout_ms - When set to a positive integer and the query would return an empty list, the store may block until matching events are appended or the timeout elapses. Stores that do not support this option ignore it.

Returns

  • {:ok, [Event.t()]} - List of events in append order

Examples

{:ok, all_events} = SessionStore.get_events(store, session.id)
{:ok, run_events} = SessionStore.get_events(store, session.id, run_id: run.id)
{:ok, message_events} = SessionStore.get_events(store, session.id, type: :message_received)

# Long-poll: wait up to 5 seconds for new events
{:ok, events} = SessionStore.get_events(store, session.id,
  after: cursor, wait_timeout_ms: 5_000)

get_latest_sequence(store, session_id)

@callback get_latest_sequence(store(), session_id()) ::
  {:ok, non_neg_integer()} | {:error, AgentSessionManager.Core.Error.t()}

Gets the latest assigned event sequence number for a session.

Returns 0 when the session has no persisted events.

get_run(store, run_id)

@callback get_run(store(), run_id()) ::
  {:ok, AgentSessionManager.Core.Run.t()}
  | {:error, AgentSessionManager.Core.Error.t()}

Retrieves a run by its ID.

Parameters

  • store - The store instance
  • run_id - The run's unique identifier

Returns

  • {:ok, Run.t()} if found
  • {:error, %Error{code: :run_not_found}} if not found

Examples

{:ok, run} = SessionStore.get_run(store, "run_abc123")

get_session(store, session_id)

@callback get_session(store(), session_id()) ::
  {:ok, AgentSessionManager.Core.Session.t()}
  | {:error, AgentSessionManager.Core.Error.t()}

Retrieves a session by its ID.

Parameters

  • store - The store instance
  • session_id - The session's unique identifier

Returns

  • {:ok, Session.t()} if found
  • {:error, %Error{code: :session_not_found}} if not found

Examples

{:ok, session} = SessionStore.get_session(store, "ses_abc123")

list_runs(store, session_id, filter_opts)

@callback list_runs(store(), session_id(), filter_opts()) ::
  {:ok, [AgentSessionManager.Core.Run.t()]}

Lists all runs for a given session.

Parameters

  • store - The store instance
  • session_id - The session's unique identifier
  • opts - Optional filter options:
    • :status - Filter by run status
    • :limit - Maximum number of results

Returns

  • {:ok, [Run.t()]} - List of runs for the session

Examples

{:ok, runs} = SessionStore.list_runs(store, session.id)
{:ok, completed_runs} = SessionStore.list_runs(store, session.id, status: :completed)

list_sessions(store, filter_opts)

@callback list_sessions(store(), filter_opts()) ::
  {:ok, [AgentSessionManager.Core.Session.t()]}

Lists sessions with optional filtering.

Parameters

  • store - The store instance
  • opts - Optional filter options:
    • :status - Filter by session status (e.g., :active, :pending)
    • :agent_id - Filter by agent ID
    • :limit - Maximum number of results
    • :offset - Number of results to skip

Returns

  • {:ok, [Session.t()]} - List of matching sessions

Examples

{:ok, all_sessions} = SessionStore.list_sessions(store)
{:ok, active_sessions} = SessionStore.list_sessions(store, status: :active)

save_run(store, t)

@callback save_run(store(), AgentSessionManager.Core.Run.t()) ::
  :ok | {:error, AgentSessionManager.Core.Error.t()}

Saves a run to the store.

This operation is idempotent - saving the same run multiple times should not create duplicates. If a run with the same ID exists, it will be updated.

Parameters

  • store - The store instance
  • run - The run struct to save

Returns

  • :ok on success
  • {:error, Error.t()} on failure

Examples

{:ok, run} = Run.new(%{session_id: session.id})
:ok = SessionStore.save_run(store, run)

save_session(store, t)

@callback save_session(store(), AgentSessionManager.Core.Session.t()) ::
  :ok | {:error, AgentSessionManager.Core.Error.t()}

Saves a session to the store.

This operation is idempotent - saving the same session multiple times should not create duplicates. If a session with the same ID exists, it will be updated.

Parameters

  • store - The store instance
  • session - The session struct to save

Returns

  • :ok on success
  • {:error, Error.t()} on failure

Examples

{:ok, session} = Session.new(%{agent_id: "agent-1"})
:ok = SessionStore.save_session(store, session)

Functions

append_event(store, event)

@spec append_event(store(), AgentSessionManager.Core.Event.t()) ::
  :ok | {:error, AgentSessionManager.Core.Error.t()}

Appends an event to the store.

append_event_with_sequence(store, event)

@spec append_event_with_sequence(store(), AgentSessionManager.Core.Event.t()) ::
  {:ok, AgentSessionManager.Core.Event.t()}
  | {:error, AgentSessionManager.Core.Error.t()}

Appends an event and returns the persisted event with assigned sequence_number.

append_events(store, events)

Appends a batch of events and returns persisted events with assigned sequence numbers.

delete_session(store, session_id)

@spec delete_session(store(), session_id()) :: :ok

Deletes a session by ID.

flush(store, execution_result)

@spec flush(store(), execution_result()) ::
  :ok | {:error, AgentSessionManager.Core.Error.t()}

Atomically persists an execution result payload.

get_active_run(store, session_id)

@spec get_active_run(store(), session_id()) ::
  {:ok, AgentSessionManager.Core.Run.t() | nil}

Gets the active run for a session.

get_events(store, session_id, opts \\ [])

@spec get_events(store(), session_id(), filter_opts()) ::
  {:ok, [AgentSessionManager.Core.Event.t()]}

Gets events for a session with optional filtering.

When wait_timeout_ms is provided and the store supports it, the call may block up to that duration waiting for matching events.

get_latest_sequence(store, session_id)

@spec get_latest_sequence(store(), session_id()) ::
  {:ok, non_neg_integer()} | {:error, AgentSessionManager.Core.Error.t()}

Gets the highest assigned sequence number for the session, or 0 when empty.

get_run(store, run_id)

@spec get_run(store(), run_id()) ::
  {:ok, AgentSessionManager.Core.Run.t()}
  | {:error, AgentSessionManager.Core.Error.t()}

Retrieves a run by ID.

get_session(store, session_id)

@spec get_session(store(), session_id()) ::
  {:ok, AgentSessionManager.Core.Session.t()}
  | {:error, AgentSessionManager.Core.Error.t()}

Retrieves a session by ID.

list_runs(store, session_id, opts \\ [])

@spec list_runs(store(), session_id(), filter_opts()) ::
  {:ok, [AgentSessionManager.Core.Run.t()]}

Lists runs for a session.

list_sessions(store, opts \\ [])

@spec list_sessions(store(), filter_opts()) ::
  {:ok, [AgentSessionManager.Core.Session.t()]}

Lists sessions with optional filtering.

save_run(store, run)

@spec save_run(store(), AgentSessionManager.Core.Run.t()) ::
  :ok | {:error, AgentSessionManager.Core.Error.t()}

Saves a run to the store.

save_session(store, session)

@spec save_session(store(), AgentSessionManager.Core.Session.t()) ::
  :ok | {:error, AgentSessionManager.Core.Error.t()}

Saves a session to the store.