AgentSessionManager.Ports.SessionStore behaviour
(AgentSessionManager v0.8.0)
Copy Markdown
View Source
Port (interface) for session storage operations.
This behaviour defines the contract that all session store implementations must fulfill. It follows the ports and adapters pattern, allowing different storage backends (in-memory, PostgreSQL, Redis, etc.) to be swapped without changing the core business logic.
Design Principles
- Append-only event log semantics: Events are immutable once stored
- Idempotent writes: Saving the same entity multiple times is safe
- Read-after-write consistency: Active run queries reflect latest writes
- Concurrent access safety: All operations must be thread-safe
Implementation Requirements
Implementations must:
- Handle concurrent access without race conditions
- Provide idempotent write operations (save_session, save_run)
- Maintain event append order
- Deduplicate events by ID
- Return proper error tuples for not-found cases
Usage
The SessionStore is typically accessed through a store instance (e.g., a GenServer pid or an Agent reference):
# Using the behaviour directly with a store instance
{:ok, store} = InMemorySessionStore.start_link([])
SessionStore.save_session(store, session)
{:ok, session} = SessionStore.get_session(store, session_id)
Summary
Callbacks
Appends an event to the event log.
Appends an event and atomically assigns a per-session sequence number.
Appends a batch of events with atomic, per-session sequence assignment.
Deletes a session by its ID.
Atomically persists an execution result (session, run, events).
Gets the currently active (running) run for a session.
Retrieves events for a session with optional filtering.
Gets the latest assigned event sequence number for a session.
Retrieves a run by its ID.
Retrieves a session by its ID.
Lists all runs for a given session.
Lists sessions with optional filtering.
Saves a run to the store.
Saves a session to the store.
Functions
Appends an event to the store.
Appends an event and returns the persisted event with assigned sequence_number.
Appends a batch of events and returns persisted events with assigned sequence numbers.
Deletes a session by ID.
Atomically persists an execution result payload.
Gets the active run for a session.
Gets events for a session with optional filtering.
Gets the highest assigned sequence number for the session, or 0 when empty.
Retrieves a run by ID.
Retrieves a session by ID.
Lists runs for a session.
Lists sessions with optional filtering.
Saves a run to the store.
Saves a session to the store.
Types
@type context() :: term()
@type execution_result() :: %{ session: AgentSessionManager.Core.Session.t(), run: AgentSessionManager.Core.Run.t(), events: [AgentSessionManager.Core.Event.t()], provider_metadata: map() }
@type filter_opts() :: keyword()
@type run_id() :: String.t()
@type session_id() :: String.t()
@type store() :: {module(), context()} | GenServer.server() | pid() | atom()
Callbacks
@callback append_event(store(), AgentSessionManager.Core.Event.t()) :: :ok | {:error, AgentSessionManager.Core.Error.t()}
Appends an event to the event log.
Events are immutable - once appended, they cannot be modified or deleted. This operation is idempotent - appending an event with the same ID multiple times will not create duplicates.
Events must be stored in append order and returned in that same order
by get_events/3.
Parameters
store- The store instanceevent- The event struct to append
Returns
:okon success{:error, Error.t()}on failure
Examples
{:ok, event} = Event.new(%{type: :session_created, session_id: session.id})
:ok = SessionStore.append_event(store, event)
@callback append_event_with_sequence(store(), AgentSessionManager.Core.Event.t()) :: {:ok, AgentSessionManager.Core.Event.t()} | {:error, AgentSessionManager.Core.Error.t()}
Appends an event and atomically assigns a per-session sequence number.
The returned event must include a non-nil sequence_number. Duplicate event IDs
must be handled idempotently and return the originally persisted event.
Parameters
store- The store instanceevent- The event struct to append
Returns
{:ok, Event.t()}on success{:error, Error.t()}on failure
@callback append_events(store(), [AgentSessionManager.Core.Event.t()]) :: {:ok, [AgentSessionManager.Core.Event.t()]} | {:error, AgentSessionManager.Core.Error.t()}
Appends a batch of events with atomic, per-session sequence assignment.
Implementations should preserve input order for returned events and apply idempotent semantics for duplicate event IDs.
@callback delete_session(store(), session_id()) :: :ok
Deletes a session by its ID.
This operation is idempotent - deleting a non-existent session returns :ok.
Parameters
store- The store instancesession_id- The session's unique identifier
Returns
:okon success (including when session doesn't exist)
Examples
:ok = SessionStore.delete_session(store, "ses_abc123")
@callback flush(store(), execution_result()) :: :ok | {:error, AgentSessionManager.Core.Error.t()}
Atomically persists an execution result (session, run, events).
On failure, no partial writes should be visible.
@callback get_active_run(store(), session_id()) :: {:ok, AgentSessionManager.Core.Run.t() | nil}
Gets the currently active (running) run for a session.
A run is considered active if its status is :running.
This operation must provide read-after-write consistency - if a run
was just saved with status :running, this function must return it
immediately.
Parameters
store- The store instancesession_id- The session's unique identifier
Returns
{:ok, Run.t()}if there's an active run{:ok, nil}if there's no active run
Examples
{:ok, active_run} = SessionStore.get_active_run(store, session.id)
@callback get_events(store(), session_id(), filter_opts()) :: {:ok, [AgentSessionManager.Core.Event.t()]}
Retrieves events for a session with optional filtering.
Events are returned in append order (oldest first).
Parameters
store- The store instancesession_id- The session's unique identifieropts- Optional filter options::run_id- Filter by run ID:type- Filter by event type:since- Events after this timestamp:after- Events with sequence number strictly greater than this value:before- Events with sequence number strictly less than this value:limit- Maximum number of results:wait_timeout_ms- When set to a positive integer and the query would return an empty list, the store may block until matching events are appended or the timeout elapses. Stores that do not support this option ignore it.
Returns
{:ok, [Event.t()]}- List of events in append order
Examples
{:ok, all_events} = SessionStore.get_events(store, session.id)
{:ok, run_events} = SessionStore.get_events(store, session.id, run_id: run.id)
{:ok, message_events} = SessionStore.get_events(store, session.id, type: :message_received)
# Long-poll: wait up to 5 seconds for new events
{:ok, events} = SessionStore.get_events(store, session.id,
after: cursor, wait_timeout_ms: 5_000)
@callback get_latest_sequence(store(), session_id()) :: {:ok, non_neg_integer()} | {:error, AgentSessionManager.Core.Error.t()}
Gets the latest assigned event sequence number for a session.
Returns 0 when the session has no persisted events.
@callback get_run(store(), run_id()) :: {:ok, AgentSessionManager.Core.Run.t()} | {:error, AgentSessionManager.Core.Error.t()}
Retrieves a run by its ID.
Parameters
store- The store instancerun_id- The run's unique identifier
Returns
{:ok, Run.t()}if found{:error, %Error{code: :run_not_found}}if not found
Examples
{:ok, run} = SessionStore.get_run(store, "run_abc123")
@callback get_session(store(), session_id()) :: {:ok, AgentSessionManager.Core.Session.t()} | {:error, AgentSessionManager.Core.Error.t()}
Retrieves a session by its ID.
Parameters
store- The store instancesession_id- The session's unique identifier
Returns
{:ok, Session.t()}if found{:error, %Error{code: :session_not_found}}if not found
Examples
{:ok, session} = SessionStore.get_session(store, "ses_abc123")
@callback list_runs(store(), session_id(), filter_opts()) :: {:ok, [AgentSessionManager.Core.Run.t()]}
Lists all runs for a given session.
Parameters
store- The store instancesession_id- The session's unique identifieropts- Optional filter options::status- Filter by run status:limit- Maximum number of results
Returns
{:ok, [Run.t()]}- List of runs for the session
Examples
{:ok, runs} = SessionStore.list_runs(store, session.id)
{:ok, completed_runs} = SessionStore.list_runs(store, session.id, status: :completed)
@callback list_sessions(store(), filter_opts()) :: {:ok, [AgentSessionManager.Core.Session.t()]}
Lists sessions with optional filtering.
Parameters
store- The store instanceopts- Optional filter options::status- Filter by session status (e.g.,:active,:pending):agent_id- Filter by agent ID:limit- Maximum number of results:offset- Number of results to skip
Returns
{:ok, [Session.t()]}- List of matching sessions
Examples
{:ok, all_sessions} = SessionStore.list_sessions(store)
{:ok, active_sessions} = SessionStore.list_sessions(store, status: :active)
@callback save_run(store(), AgentSessionManager.Core.Run.t()) :: :ok | {:error, AgentSessionManager.Core.Error.t()}
Saves a run to the store.
This operation is idempotent - saving the same run multiple times should not create duplicates. If a run with the same ID exists, it will be updated.
Parameters
store- The store instancerun- The run struct to save
Returns
:okon success{:error, Error.t()}on failure
Examples
{:ok, run} = Run.new(%{session_id: session.id})
:ok = SessionStore.save_run(store, run)
@callback save_session(store(), AgentSessionManager.Core.Session.t()) :: :ok | {:error, AgentSessionManager.Core.Error.t()}
Saves a session to the store.
This operation is idempotent - saving the same session multiple times should not create duplicates. If a session with the same ID exists, it will be updated.
Parameters
store- The store instancesession- The session struct to save
Returns
:okon success{:error, Error.t()}on failure
Examples
{:ok, session} = Session.new(%{agent_id: "agent-1"})
:ok = SessionStore.save_session(store, session)
Functions
@spec append_event(store(), AgentSessionManager.Core.Event.t()) :: :ok | {:error, AgentSessionManager.Core.Error.t()}
Appends an event to the store.
@spec append_event_with_sequence(store(), AgentSessionManager.Core.Event.t()) :: {:ok, AgentSessionManager.Core.Event.t()} | {:error, AgentSessionManager.Core.Error.t()}
Appends an event and returns the persisted event with assigned sequence_number.
@spec append_events(store(), [AgentSessionManager.Core.Event.t()]) :: {:ok, [AgentSessionManager.Core.Event.t()]} | {:error, AgentSessionManager.Core.Error.t()}
Appends a batch of events and returns persisted events with assigned sequence numbers.
@spec delete_session(store(), session_id()) :: :ok
Deletes a session by ID.
@spec flush(store(), execution_result()) :: :ok | {:error, AgentSessionManager.Core.Error.t()}
Atomically persists an execution result payload.
@spec get_active_run(store(), session_id()) :: {:ok, AgentSessionManager.Core.Run.t() | nil}
Gets the active run for a session.
@spec get_events(store(), session_id(), filter_opts()) :: {:ok, [AgentSessionManager.Core.Event.t()]}
Gets events for a session with optional filtering.
When wait_timeout_ms is provided and the store supports it,
the call may block up to that duration waiting for matching events.
@spec get_latest_sequence(store(), session_id()) :: {:ok, non_neg_integer()} | {:error, AgentSessionManager.Core.Error.t()}
Gets the highest assigned sequence number for the session, or 0 when empty.
@spec get_run(store(), run_id()) :: {:ok, AgentSessionManager.Core.Run.t()} | {:error, AgentSessionManager.Core.Error.t()}
Retrieves a run by ID.
@spec get_session(store(), session_id()) :: {:ok, AgentSessionManager.Core.Session.t()} | {:error, AgentSessionManager.Core.Error.t()}
Retrieves a session by ID.
@spec list_runs(store(), session_id(), filter_opts()) :: {:ok, [AgentSessionManager.Core.Run.t()]}
Lists runs for a session.
@spec list_sessions(store(), filter_opts()) :: {:ok, [AgentSessionManager.Core.Session.t()]}
Lists sessions with optional filtering.
@spec save_run(store(), AgentSessionManager.Core.Run.t()) :: :ok | {:error, AgentSessionManager.Core.Error.t()}
Saves a run to the store.
@spec save_session(store(), AgentSessionManager.Core.Session.t()) :: :ok | {:error, AgentSessionManager.Core.Error.t()}
Saves a session to the store.