PhoenixAI.Store (PhoenixAI.Store v0.1.0)

Copy Markdown View Source

Supervisor and public API facade for PhoenixAI conversation storage.

PhoenixAI.Store is both a Supervisor (managing adapter-specific children and an Instance GenServer) and the public API facade that delegates to the configured adapter.

Starting a store

{:ok, _pid} = PhoenixAI.Store.start_link(
  name: :my_store,
  adapter: PhoenixAI.Store.Adapters.ETS
)

Using the API

{:ok, conv} = PhoenixAI.Store.save_conversation(conversation, store: :my_store)
{:ok, conv} = PhoenixAI.Store.load_conversation(conv.id, store: :my_store)

Summary

Functions

Adds a message to a conversation. Generates a UUID v7 if id is nil and injects inserted_at.

Applies a memory pipeline to a conversation's messages.

Runs guardrail policies against a request, with store adapter injection.

Returns a specification to start this module under a supervisor.

Checks whether a conversation with the given ID exists.

Sends a user message to an AI provider within a persisted conversation.

Counts conversations matching the given filters.

Counts events matching the given filters.

Deletes a conversation by ID.

Deletes a specific fact by key for a user.

Deletes the profile for a user.

Extracts new facts from a conversation's unprocessed messages and persists them.

Returns all cost records for a conversation.

Returns all stored facts for a user.

Gets all messages for a conversation, ordered by inserted_at.

Loads the profile for a user by ID.

Lists conversations matching the given filters.

Lists events matching the given filters.

Loads a conversation by ID, including its messages.

Logs an event through the EventLog orchestrator.

Records the cost of a single AI provider call.

Saves a conversation. Generates a UUID v7 if id is nil and injects timestamps.

Persists a long-term memory fact for a user.

Persists a user profile.

Starts the store supervisor with the given options.

Aggregates cost across records matching the given filters.

Logs a custom event through the EventLog using a simplified map API.

Regenerates and saves a user profile summary from their stored facts.

Functions

add_message(conversation_id, msg, opts \\ [])

@spec add_message(String.t(), PhoenixAI.Store.Message.t(), keyword()) ::
  {:ok, PhoenixAI.Store.Message.t()} | {:error, term()}

Adds a message to a conversation. Generates a UUID v7 if id is nil and injects inserted_at.

apply_memory(conversation_id, pipeline, opts \\ [])

@spec apply_memory(String.t(), PhoenixAI.Store.Memory.Pipeline.t(), keyword()) ::
  {:ok, [PhoenixAI.Message.t()]} | {:error, term()}

Applies a memory pipeline to a conversation's messages.

Fetches raw messages from the adapter, runs the pipeline (which handles pinned message extraction, strategy sorting/execution, and re-injection), then converts the result to %PhoenixAI.Message{} structs.

Options

  • :store - the store name (default: :phoenix_ai_store_default)
  • :model - model override for strategy context
  • :provider - provider override for strategy context
  • :max_tokens - token budget override
  • :token_counter - token counter module override

check_guardrails(request, policies, opts \\ [])

Runs guardrail policies against a request, with store adapter injection.

Resolves the adapter from opts, injects it into request.assigns (so stateful policies like TokenBudget can query the store), then delegates to PhoenixAI.Guardrails.Pipeline.run/2.

Example

request = %Request{
  messages: messages,
  conversation_id: conv_id,
  user_id: user_id
}

policies = [
  {PhoenixAI.Store.Guardrails.TokenBudget, [max: 100_000, scope: :conversation]},
  {PhoenixAI.Guardrails.Policies.JailbreakDetection, [threshold: 0.7]}
]

case Store.check_guardrails(request, policies, store: :my_store) do
  {:ok, request} -> AI.chat(request.messages, opts)
  {:error, violation} -> handle_violation(violation)
end

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

conversation_exists?(id, opts \\ [])

@spec conversation_exists?(
  String.t(),
  keyword()
) :: {:ok, boolean()} | {:error, term()}

Checks whether a conversation with the given ID exists.

converse(conversation_id, message, opts \\ [])

@spec converse(String.t(), String.t(), keyword()) ::
  {:ok, PhoenixAI.Response.t()} | {:error, term()}

Sends a user message to an AI provider within a persisted conversation.

Resolves the adapter, merges per-call options over config-level :converse defaults, and delegates to ConversePipeline.run/3 which handles:

  1. Saving the user message
  2. Loading conversation history
  3. Applying memory pipeline (if configured)
  4. Running guardrail checks (if configured)
  5. Calling the AI provider
  6. Saving the assistant response
  7. Recording cost (if cost tracking enabled)
  8. Extracting LTM facts (if enabled)

Options

  • :store — store instance name (default: :phoenix_ai_store_default)
  • :provider — AI provider atom (e.g. :openai, :test)
  • :model — model string (e.g. "gpt-4o")
  • :api_key — API key for the provider
  • :system — system prompt
  • :tools — tool definitions for function calling
  • :memory_pipeline%Pipeline{} for memory management
  • :guardrails — list of guardrail policy entries
  • :user_id — user identifier
  • :extract_facts — whether to auto-extract LTM facts (default from config)

count_conversations(filters \\ [], opts \\ [])

@spec count_conversations(keyword(), keyword()) ::
  {:ok, non_neg_integer()} | {:error, term()}

Counts conversations matching the given filters.

count_events(filters \\ [], opts \\ [])

@spec count_events(keyword(), keyword()) ::
  {:ok, non_neg_integer()} | {:error, term()}

Counts events matching the given filters.

Delegates to adapter.count_events/2 if the adapter supports EventStore.

delete_conversation(id, opts \\ [])

@spec delete_conversation(
  String.t(),
  keyword()
) :: :ok | {:error, :not_found | term()}

Deletes a conversation by ID.

delete_fact(user_id, key, opts \\ [])

@spec delete_fact(String.t(), String.t(), keyword()) :: :ok | {:error, term()}

Deletes a specific fact by key for a user.

delete_profile(user_id, opts \\ [])

@spec delete_profile(
  String.t(),
  keyword()
) :: :ok | {:error, term()}

Deletes the profile for a user.

extract_facts(conversation_id, opts \\ [])

@spec extract_facts(
  String.t(),
  keyword()
) ::
  {:ok, [PhoenixAI.Store.LongTermMemory.Fact.t()]}
  | {:ok, :async}
  | {:error, term()}

Extracts new facts from a conversation's unprocessed messages and persists them.

get_cost_records(conversation_id, opts \\ [])

@spec get_cost_records(
  String.t(),
  keyword()
) :: {:ok, [PhoenixAI.Store.CostTracking.CostRecord.t()]} | {:error, term()}

Returns all cost records for a conversation.

Delegates to adapter.get_cost_records/2 if the adapter supports CostStore.

get_facts(user_id, opts \\ [])

@spec get_facts(
  String.t(),
  keyword()
) :: {:ok, [PhoenixAI.Store.LongTermMemory.Fact.t()]} | {:error, term()}

Returns all stored facts for a user.

get_messages(conversation_id, opts \\ [])

@spec get_messages(
  String.t(),
  keyword()
) :: {:ok, [PhoenixAI.Store.Message.t()]} | {:error, term()}

Gets all messages for a conversation, ordered by inserted_at.

get_profile(user_id, opts \\ [])

@spec get_profile(
  String.t(),
  keyword()
) ::
  {:ok, PhoenixAI.Store.LongTermMemory.Profile.t()}
  | {:error, :not_found | term()}

Loads the profile for a user by ID.

list_conversations(filters \\ [], opts \\ [])

@spec list_conversations(keyword(), keyword()) ::
  {:ok, [PhoenixAI.Store.Conversation.t()]} | {:error, term()}

Lists conversations matching the given filters.

list_events(filters \\ [], opts \\ [])

@spec list_events(keyword(), keyword()) ::
  {:ok,
   %{
     events: [PhoenixAI.Store.EventLog.Event.t()],
     next_cursor: String.t() | nil
   }}
  | {:error, term()}

Lists events matching the given filters.

Delegates to adapter.list_events/2 if the adapter supports EventStore.

load_conversation(id, opts \\ [])

@spec load_conversation(
  String.t(),
  keyword()
) :: {:ok, PhoenixAI.Store.Conversation.t()} | {:error, :not_found | term()}

Loads a conversation by ID, including its messages.

log_event(event, opts \\ [])

@spec log_event(
  PhoenixAI.Store.EventLog.Event.t(),
  keyword()
) :: {:ok, PhoenixAI.Store.EventLog.Event.t()} | {:error, term()}

Logs an event through the EventLog orchestrator.

Resolves the adapter, injects redact_fn from config, and delegates to EventLog.log/3.

record_cost(conversation_id, response, opts \\ [])

@spec record_cost(String.t(), PhoenixAI.Response.t(), keyword()) ::
  {:ok, PhoenixAI.Store.CostTracking.CostRecord.t()} | {:error, term()}

Records the cost of a single AI provider call.

Resolves the adapter and pricing provider from the store config, delegates to CostTracking.record/3, and wraps the call in a telemetry span [:phoenix_ai_store, :cost, :record].

Options

  • :store — the store name (default: :phoenix_ai_store_default)
  • :user_id — user to attribute cost to
  • :pricing_provider — module override for pricing lookup
  • :metadata — extra metadata map

save_conversation(conv, opts \\ [])

@spec save_conversation(
  PhoenixAI.Store.Conversation.t(),
  keyword()
) :: {:ok, PhoenixAI.Store.Conversation.t()} | {:error, term()}

Saves a conversation. Generates a UUID v7 if id is nil and injects timestamps.

save_fact(fact, opts \\ [])

Persists a long-term memory fact for a user.

save_profile(profile, opts \\ [])

Persists a user profile.

start_link(opts)

@spec start_link(keyword()) :: Supervisor.on_start()

Starts the store supervisor with the given options.

sum_cost(filters \\ [], opts \\ [])

@spec sum_cost(keyword(), keyword()) :: {:ok, Decimal.t()} | {:error, term()}

Aggregates cost across records matching the given filters.

Delegates to adapter.sum_cost/2 if the adapter supports CostStore.

Filters

  • :user_id — filter by user
  • :conversation_id — filter by conversation
  • :provider — filter by provider atom (e.g. :openai)
  • :model — filter by model string (e.g. "gpt-4o")
  • :after — include only records with recorded_at >= dt
  • :before — include only records with recorded_at <= dt

track(params)

@spec track(map()) :: {:ok, PhoenixAI.Store.EventLog.Event.t()} | {:error, term()}

Logs a custom event through the EventLog using a simplified map API.

Builds an %Event{} from the given map and delegates to log_event/2.

Required keys

  • :type — event type atom

Optional keys

  • :data — event data map (default: %{})
  • :conversation_id — associated conversation ID
  • :user_id — associated user ID
  • :store — store instance name (default: :phoenix_ai_store_default)

Example

Store.track(%{type: :user_feedback, data: %{rating: 5}, user_id: "u1"})

update_profile(user_id, opts \\ [])

@spec update_profile(
  String.t(),
  keyword()
) :: {:ok, PhoenixAI.Store.LongTermMemory.Profile.t()} | {:error, term()}

Regenerates and saves a user profile summary from their stored facts.