Spector (Spector v0.6.0)
View SourceCQRS-style event sourcing for Ecto schemas.
Spector records all changes to your Ecto schemas as events in a separate event log table. This enables full audit trails, temporal queries, and the ability to replay history. For tamper-evident logs, enable optional hash chain integrity.
Quick Start
1. Define an Events Table
defmodule MyApp.Events do
use Spector.Events,
table: "events",
schemas: [MyApp.User, MyApp.Post],
repo: MyApp.Repo
end2. Mark Schemas as Evented
defmodule MyApp.User do
use Spector.Evented, events: MyApp.Events
use Ecto.Schema
schema "users" do
field :name, :string
field :email, :string
end
def changeset(changeset, attrs) do
changeset
|> Ecto.Changeset.cast(attrs, [:name, :email])
|> Ecto.Changeset.validate_required([:name, :email])
end
end3. Create Migrations
defmodule MyApp.Repo.Migrations.CreateEvents do
use Ecto.Migration
def up, do: Spector.Migration.up(table: "events")
def down, do: Spector.Migration.down(table: "events")
end4. Use Spector Instead of Repo
# Insert
{:ok, user} = Spector.insert(MyApp.User, %{name: "Alice", email: "alice@example.com"})
# Update
{:ok, user} = Spector.update(user, %{name: "Alice Smith"})
# Delete
{:ok, user} = Spector.delete(user)How It Works
When you update or execute an action on a record, Spector "rolls forward" by
replaying all stored events through your schema's changeset/2 function. This means:
- Your changeset function handles both new operations AND historical replay
- Schema migrations happen automatically during replay (using version guards)
- The current state is always reconstructed from the event log
- Stale in-memory records are never a problem
This design lets you evolve your schema over time while maintaining full compatibility with historical events.
Guides
For complete examples of building applications with Spector:
- Building an AI Chat Log - Conversation branching with tree-structured message history
- Building a Basic Chat - Simple chat with edit history tracking
Features
Custom Actions
Define domain-specific actions beyond insert/update/delete:
defmodule MyApp.Item do
use Spector.Evented, events: MyApp.Events, actions: [:archive]
def changeset(changeset, attrs) when changeset.action == :archive do
Ecto.Changeset.change(changeset, archived_at: Spector.get_attr(attrs, :archived_at))
end
def changeset(changeset, attrs) do
Ecto.Changeset.cast(changeset, attrs, [:name, :value])
end
end
# Execute custom action
{:ok, item} = Spector.execute(item, :archive, %{archived_at: DateTime.utc_now()})Schema Versioning
Handle schema migrations with version guards:
defmodule MyApp.User do
use Spector.Evented, events: MyApp.Events, version: 1
# Migrate v0 events (with :title) to v1 (with :name)
def changeset(changeset, attrs) when version_is(attrs, 0) do
attrs = Map.put(attrs, "name", Spector.get_attr(attrs, :title))
do_changeset(changeset, attrs)
end
def changeset(changeset, attrs), do: do_changeset(changeset, attrs)
endHash Chain Integrity
Enable tamper-evident event logs with cryptographic hashing:
defmodule MyApp.Events do
use Spector.Events,
table: "events",
schemas: [MyApp.User],
repo: MyApp.Repo,
hashed: true
endExplicit Schema Indexing
Ensure stability when adding/removing schemas:
schemas: [MyApp.User, MyApp.Post, {MyApp.Comment, 10}]Action Aliases
Maintain backwards compatibility when renaming actions:
use Spector.Events,
aliases: [soft_delete: :archive]Reserved Attributes
Spector injects reserved attributes into the attrs map passed to your
changeset/2 function. These provide metadata about the event being applied:
:__version__- The schema version when the event was created. Use withversion_is/2guards to handle schema migrations during replay.:__event_id__- The unique ID of the event being applied. UseSpector.changeset_put_event_id/3to assign this to a field:def changeset(message, attrs) do message |> Ecto.Changeset.cast(attrs, [:content]) |> Spector.changeset_put_event_id(attrs) # puts :__event_id__ into :id field end
These attributes are also stored in the event payload for reference.
Database Support
Spector works with any database supported by Ecto for basic functionality.
Note: Hashed event tables (hashed: true) currently require PostgreSQL.
The hash chain integrity feature uses LOCK TABLE ... IN EXCLUSIVE MODE which
is PostgreSQL-specific.
Summary
Types
An action atom (e.g., :insert, :update, :delete, or custom actions)
Attributes map passed to changesets
Options for bringup/2
An Ecto schema module that uses Spector.Evented
A struct instance of an evented schema
Functions
Returns all events for a record from the beginning.
Import existing database records into the event log.
Assigns the current event ID to a changeset field.
Delete a record, creating a delete event in the event log.
Execute an action on a record, creating an event in the event log.
Fetches a field from attrs, checking both atom and string keys.
Fetches a field from attrs, checking both atom and string keys.
Retrieve the current state of a record by replaying its events.
Gets a field from attrs, checking both atom and string keys.
Insert a new record, creating an event in the event log.
Returns all events up to and including the given event.
Returns events starting from the most recent savepoint.
Create a savepoint event capturing the current state of a record.
Update an existing record, creating an event in the event log.
Types
@type action() :: atom()
An action atom (e.g., :insert, :update, :delete, or custom actions)
@type attrs() :: map()
Attributes map passed to changesets
@type bringup_opts() :: [ action: action(), attr_fn: (evented_struct() -> attrs()), transfer: (evented_struct(), evented_struct() -> any()) ]
Options for bringup/2
@type evented_schema() :: module()
An Ecto schema module that uses Spector.Evented
@type evented_struct() :: struct()
A struct instance of an evented schema
Functions
@spec all_events(evented_schema(), Ecto.UUID.t()) :: [struct()]
Returns all events for a record from the beginning.
Events are returned in insertion order.
Example
events = Spector.all_events(MyApp.User, user_id)
@spec bringup(evented_schema(), bringup_opts()) :: {:ok, [evented_struct()]}
Import existing database records into the event log.
Reads all rows from the schema's table and migrates each to Spector management: deletes the original record and creates a new one with a UUIDv7 ID and corresponding event. The entire operation runs in a single transaction.
Records that are already tracked by Spector (have existing events) are skipped.
Options
:action- The action to use for the event (default::insert). Use a custom action like:importto trigger different changeset behavior during migration.:attr_fn- A function that takes a record and returns the attributes map to insert (default: extracts all schema fields except the primary key). Use this to transform or augment data during migration.:transfer- A function that receives the old record and the new record before the old record is deleted. Use this to update associations or perform other transfer operations (default: no-op).
Returns {:ok, [struct]} on success or {:error, reason} on failure.
Timestamps
By default, bringup creates new records, so inserted_at and updated_at timestamps
will be set to the current time. To preserve original timestamps from the source
records, include them in the attr_fn and ensure your changeset accepts them.
If you don't want your regular changeset to accept timestamp fields, use a custom
action like :import to handle them separately:
# Register :import as a custom action
use Spector.Evented, events: MyApp.Events, actions: [:import]
# Handle :import with timestamp support
def changeset(changeset, attrs) when changeset.action == :import do
changeset
|> cast(attrs, [:name, :inserted_at, :updated_at])
|> validate_required([:name])
end
# Regular changeset doesn't accept timestamps
def changeset(changeset, attrs) do
changeset
|> cast(attrs, [:name])
|> validate_required([:name])
end
# Pass timestamps in attr_fn
attr_fn = fn record ->
%{name: record.name, inserted_at: record.inserted_at, updated_at: record.updated_at}
end
{:ok, users} = Spector.bringup(MyApp.User, action: :import, attr_fn: attr_fn)Examples
Basic usage migrates all untracked records (timestamps reset to now):
{:ok, users} = Spector.bringup(MyApp.User)Use a custom attr_fn to transform data during migration:
{:ok, users} = Spector.bringup(MyApp.User, attr_fn: fn record ->
%{
name: String.upcase(record.name),
value: record.value || 0 # provide defaults for nil values
}
end)Use a transfer function to update associations before the old record is deleted:
{:ok, users} = Spector.bringup(MyApp.User, transfer: fn old, new ->
Repo.update_all(
from(p in Post, where: p.user_id == ^old.id),
set: [user_id: new.id]
)
end)
@spec changeset_put_event_id(Ecto.Changeset.t(), attrs(), atom()) :: Ecto.Changeset.t()
Assigns the current event ID to a changeset field.
When Spector calls your changeset/2 function, it includes an :__event_id__
key in the attrs map. This function extracts that ID and assigns it to
the specified field in your changeset.
This is useful for embedded schemas and {:array, :map} rollup records
where you want each record to have a unique ID that matches its creation event.
Parameters
changeset- The changeset to modifyattrs- The attrs map passed tochangeset/2(contains:__event_id__)field- The field to assign the event ID to (default::id)
Example
def changeset(message, attrs) do
message
|> Ecto.Changeset.cast(attrs, [:content, :role])
|> Spector.changeset_put_event_id(attrs)
|> Ecto.Changeset.validate_required([:id, :content, :role])
end
@spec delete(evented_struct()) :: {:ok, evented_struct()} | {:error, Ecto.Changeset.t()}
Delete a record, creating a delete event in the event log.
Returns {:ok, struct} on success or {:error, changeset} on failure.
The delete event is recorded in the event log before the record is removed from the database, providing a complete audit trail.
Example
{:ok, user} = Spector.delete(user)
@spec execute(evented_struct(), action(), attrs()) :: {:ok, evented_struct()} | {:error, Ecto.Changeset.t()}
Execute an action on a record, creating an event in the event log.
This is the general-purpose function for applying any action to a record,
including custom actions defined in the schema's :actions option.
Returns {:ok, struct} on success or {:error, changeset} on failure.
The function rolls forward from the event log to reconstruct current state,
applies the action through the schema's changeset/2 function, and records
the new event.
Example
# Using a custom :archive action
{:ok, item} = Spector.execute(item, :archive, %{archived_at: DateTime.utc_now()})
# The :update action (same as Spector.update/2)
{:ok, user} = Spector.execute(user, :update, %{name: "New Name"})
Fetches a field from attrs, checking both atom and string keys.
Returns {:ok, value} if the key exists, or :error if not found.
This is useful in changesets where attrs may come with string keys (from JSON) or atom keys (from internal calls).
Example
def changeset(record, attrs) do
case Spector.fetch_attr(attrs, :parent_id) do
{:ok, parent_id} -> # handle parent_id
:error -> # no parent_id provided
end
end
Fetches a field from attrs, checking both atom and string keys.
Returns the value if the key exists, or raises KeyError if not found.
Example
def changeset(record, attrs) do
parent_id = Spector.fetch_attr!(attrs, :parent_id)
# use parent_id
end
@spec get(evented_schema(), String.t()) :: evented_struct() | nil
Retrieve the current state of a record by replaying its events.
Returns the struct if found, or nil if no events exist for the given ID.
This is useful for embedded schemas (without database tables) or when you want to reconstruct state purely from the event log.
Example
user = Spector.get(MyApp.User, "019ac640-dfc0-7407-8238-39a9c45e8813")
Gets a field from attrs, checking both atom and string keys.
Returns the value if the key exists, or default if not found.
Example
def changeset(record, attrs) do
parent_id = Spector.get_attr(attrs, :parent_id, nil)
# use parent_id, which may be nil
end
@spec insert(evented_schema(), attrs(), action()) :: {:ok, evented_struct()} | {:error, Ecto.Changeset.t()}
Insert a new record, creating an event in the event log.
Returns {:ok, struct} on success or {:error, changeset} on failure.
Example
{:ok, user} = Spector.insert(MyApp.User, %{name: "Alice", email: "alice@example.com"})The inserted struct will have a new UUIDv7 id assigned.
Returns all events up to and including the given event.
Events are returned in insertion order. The specified event is included as the last element in the result.
Note
This function does not take into account savepoints.
Example
events = Spector.previous_events(event)
@spec recent_events(evented_schema(), Ecto.UUID.t()) :: [struct()]
Returns events starting from the most recent savepoint.
If no savepoint exists, returns all events from the beginning. The savepoint event itself is included as the first element.
Events are returned in insertion order.
Use of this function over all_events/2 is preferable; if the
schema does not support savepoints, the two functions behave
identically.
Example
events = Spector.recent_events(MyApp.User, user_id)
Create a savepoint event capturing the current state of a record.
Returns {:ok, struct} on success or raises on failure.
Savepoints store the complete state of a record at a point in time, allowing event replay to start from the savepoint instead of replaying all events from the beginning. This is useful for records with long event histories.
The schema must implement the savepoint/2 callback to define how the
current state is converted to an attrs map:
@behaviour Spector.Evented
@impl true
def savepoint(record, _version) do
%{name: record.name, email: record.email}
endForms
There are two ways to create a savepoint:
From a record (savepoint/1)
Pass the record directly. This verifies that the record matches the current state in the event log (replayed events must produce the same field values). This guards against creating savepoints from stale records:
{:ok, user} = Spector.savepoint(user)If the record is stale (e.g., another process updated it), this raises an error.
From schema and ID (savepoint/2)
Pass the schema module and record ID. This replays events to determine current state without verification:
{:ok, user} = Spector.savepoint(MyApp.User, user_id)Use this form when you don't have the record in memory or don't need stale record detection.
@spec update(evented_struct(), attrs()) :: {:ok, evented_struct()} | {:error, Ecto.Changeset.t()}
Update an existing record, creating an event in the event log.
This is a convenience function that calls execute(record, :update, attrs).
Returns {:ok, struct} on success or {:error, changeset} on failure.
Example
{:ok, user} = Spector.update(user, %{name: "Alice Smith"})