Spector (Spector v0.6.0)

View Source

CQRS-style event sourcing for Ecto schemas.

Spector records all changes to your Ecto schemas as events in a separate event log table. This enables full audit trails, temporal queries, and the ability to replay history. For tamper-evident logs, enable optional hash chain integrity.

Quick Start

1. Define an Events Table

defmodule MyApp.Events do
  use Spector.Events,
    table: "events",
    schemas: [MyApp.User, MyApp.Post],
    repo: MyApp.Repo
end

2. Mark Schemas as Evented

defmodule MyApp.User do
  use Spector.Evented, events: MyApp.Events
  use Ecto.Schema

  schema "users" do
    field :name, :string
    field :email, :string
  end

  def changeset(changeset, attrs) do
    changeset
    |> Ecto.Changeset.cast(attrs, [:name, :email])
    |> Ecto.Changeset.validate_required([:name, :email])
  end
end

3. Create Migrations

defmodule MyApp.Repo.Migrations.CreateEvents do
  use Ecto.Migration

  def up, do: Spector.Migration.up(table: "events")
  def down, do: Spector.Migration.down(table: "events")
end

4. Use Spector Instead of Repo

# Insert
{:ok, user} = Spector.insert(MyApp.User, %{name: "Alice", email: "alice@example.com"})

# Update
{:ok, user} = Spector.update(user, %{name: "Alice Smith"})

# Delete
{:ok, user} = Spector.delete(user)

How It Works

When you update or execute an action on a record, Spector "rolls forward" by replaying all stored events through your schema's changeset/2 function. This means:

  • Your changeset function handles both new operations AND historical replay
  • Schema migrations happen automatically during replay (using version guards)
  • The current state is always reconstructed from the event log
  • Stale in-memory records are never a problem

This design lets you evolve your schema over time while maintaining full compatibility with historical events.

Guides

For complete examples of building applications with Spector:

Features

Custom Actions

Define domain-specific actions beyond insert/update/delete:

defmodule MyApp.Item do
  use Spector.Evented, events: MyApp.Events, actions: [:archive]

  def changeset(changeset, attrs) when changeset.action == :archive do
    Ecto.Changeset.change(changeset, archived_at: Spector.get_attr(attrs, :archived_at))
  end

  def changeset(changeset, attrs) do
    Ecto.Changeset.cast(changeset, attrs, [:name, :value])
  end
end

# Execute custom action
{:ok, item} = Spector.execute(item, :archive, %{archived_at: DateTime.utc_now()})

Schema Versioning

Handle schema migrations with version guards:

defmodule MyApp.User do
  use Spector.Evented, events: MyApp.Events, version: 1

  # Migrate v0 events (with :title) to v1 (with :name)
  def changeset(changeset, attrs) when version_is(attrs, 0) do
    attrs = Map.put(attrs, "name", Spector.get_attr(attrs, :title))
    do_changeset(changeset, attrs)
  end

  def changeset(changeset, attrs), do: do_changeset(changeset, attrs)
end

Hash Chain Integrity

Enable tamper-evident event logs with cryptographic hashing:

defmodule MyApp.Events do
  use Spector.Events,
    table: "events",
    schemas: [MyApp.User],
    repo: MyApp.Repo,
    hashed: true
end

Explicit Schema Indexing

Ensure stability when adding/removing schemas:

schemas: [MyApp.User, MyApp.Post, {MyApp.Comment, 10}]

Action Aliases

Maintain backwards compatibility when renaming actions:

use Spector.Events,
  aliases: [soft_delete: :archive]

Reserved Attributes

Spector injects reserved attributes into the attrs map passed to your changeset/2 function. These provide metadata about the event being applied:

  • :__version__ - The schema version when the event was created. Use with version_is/2 guards to handle schema migrations during replay.

  • :__event_id__ - The unique ID of the event being applied. Use Spector.changeset_put_event_id/3 to assign this to a field:

    def changeset(message, attrs) do
      message
      |> Ecto.Changeset.cast(attrs, [:content])
      |> Spector.changeset_put_event_id(attrs)  # puts :__event_id__ into :id field
    end

These attributes are also stored in the event payload for reference.

Database Support

Spector works with any database supported by Ecto for basic functionality.

Note: Hashed event tables (hashed: true) currently require PostgreSQL. The hash chain integrity feature uses LOCK TABLE ... IN EXCLUSIVE MODE which is PostgreSQL-specific.

Summary

Types

An action atom (e.g., :insert, :update, :delete, or custom actions)

Attributes map passed to changesets

An Ecto schema module that uses Spector.Evented

A struct instance of an evented schema

Functions

Returns all events for a record from the beginning.

Import existing database records into the event log.

Assigns the current event ID to a changeset field.

Delete a record, creating a delete event in the event log.

Execute an action on a record, creating an event in the event log.

Fetches a field from attrs, checking both atom and string keys.

Fetches a field from attrs, checking both atom and string keys.

Retrieve the current state of a record by replaying its events.

Gets a field from attrs, checking both atom and string keys.

Insert a new record, creating an event in the event log.

Returns all events up to and including the given event.

Returns events starting from the most recent savepoint.

Create a savepoint event capturing the current state of a record.

Update an existing record, creating an event in the event log.

Types

action()

@type action() :: atom()

An action atom (e.g., :insert, :update, :delete, or custom actions)

attrs()

@type attrs() :: map()

Attributes map passed to changesets

bringup_opts()

@type bringup_opts() :: [
  action: action(),
  attr_fn: (evented_struct() -> attrs()),
  transfer: (evented_struct(), evented_struct() -> any())
]

Options for bringup/2

evented_schema()

@type evented_schema() :: module()

An Ecto schema module that uses Spector.Evented

evented_struct()

@type evented_struct() :: struct()

A struct instance of an evented schema

Functions

all_events(schema, parent_id)

@spec all_events(evented_schema(), Ecto.UUID.t()) :: [struct()]

Returns all events for a record from the beginning.

Events are returned in insertion order.

Example

events = Spector.all_events(MyApp.User, user_id)

bringup(schema, opts \\ [])

@spec bringup(evented_schema(), bringup_opts()) :: {:ok, [evented_struct()]}

Import existing database records into the event log.

Reads all rows from the schema's table and migrates each to Spector management: deletes the original record and creates a new one with a UUIDv7 ID and corresponding event. The entire operation runs in a single transaction.

Records that are already tracked by Spector (have existing events) are skipped.

Options

  • :action - The action to use for the event (default: :insert). Use a custom action like :import to trigger different changeset behavior during migration.

  • :attr_fn - A function that takes a record and returns the attributes map to insert (default: extracts all schema fields except the primary key). Use this to transform or augment data during migration.

  • :transfer - A function that receives the old record and the new record before the old record is deleted. Use this to update associations or perform other transfer operations (default: no-op).

Returns {:ok, [struct]} on success or {:error, reason} on failure.

Timestamps

By default, bringup creates new records, so inserted_at and updated_at timestamps will be set to the current time. To preserve original timestamps from the source records, include them in the attr_fn and ensure your changeset accepts them.

If you don't want your regular changeset to accept timestamp fields, use a custom action like :import to handle them separately:

# Register :import as a custom action
use Spector.Evented, events: MyApp.Events, actions: [:import]

# Handle :import with timestamp support
def changeset(changeset, attrs) when changeset.action == :import do
  changeset
  |> cast(attrs, [:name, :inserted_at, :updated_at])
  |> validate_required([:name])
end

# Regular changeset doesn't accept timestamps
def changeset(changeset, attrs) do
  changeset
  |> cast(attrs, [:name])
  |> validate_required([:name])
end

# Pass timestamps in attr_fn
attr_fn = fn record ->
  %{name: record.name, inserted_at: record.inserted_at, updated_at: record.updated_at}
end

{:ok, users} = Spector.bringup(MyApp.User, action: :import, attr_fn: attr_fn)

Examples

Basic usage migrates all untracked records (timestamps reset to now):

{:ok, users} = Spector.bringup(MyApp.User)

Use a custom attr_fn to transform data during migration:

{:ok, users} = Spector.bringup(MyApp.User, attr_fn: fn record ->
  %{
    name: String.upcase(record.name),
    value: record.value || 0  # provide defaults for nil values
  }
end)

Use a transfer function to update associations before the old record is deleted:

{:ok, users} = Spector.bringup(MyApp.User, transfer: fn old, new ->
  Repo.update_all(
    from(p in Post, where: p.user_id == ^old.id),
    set: [user_id: new.id]
  )
end)

changeset_put_event_id(changeset, attrs, field \\ :id)

@spec changeset_put_event_id(Ecto.Changeset.t(), attrs(), atom()) ::
  Ecto.Changeset.t()

Assigns the current event ID to a changeset field.

When Spector calls your changeset/2 function, it includes an :__event_id__ key in the attrs map. This function extracts that ID and assigns it to the specified field in your changeset.

This is useful for embedded schemas and {:array, :map} rollup records where you want each record to have a unique ID that matches its creation event.

Parameters

  • changeset - The changeset to modify
  • attrs - The attrs map passed to changeset/2 (contains :__event_id__)
  • field - The field to assign the event ID to (default: :id)

Example

def changeset(message, attrs) do
  message
  |> Ecto.Changeset.cast(attrs, [:content, :role])
  |> Spector.changeset_put_event_id(attrs)
  |> Ecto.Changeset.validate_required([:id, :content, :role])
end

delete(record)

@spec delete(evented_struct()) ::
  {:ok, evented_struct()} | {:error, Ecto.Changeset.t()}

Delete a record, creating a delete event in the event log.

Returns {:ok, struct} on success or {:error, changeset} on failure.

The delete event is recorded in the event log before the record is removed from the database, providing a complete audit trail.

Example

{:ok, user} = Spector.delete(user)

execute(record, action, attrs)

@spec execute(evented_struct(), action(), attrs()) ::
  {:ok, evented_struct()} | {:error, Ecto.Changeset.t()}

Execute an action on a record, creating an event in the event log.

This is the general-purpose function for applying any action to a record, including custom actions defined in the schema's :actions option.

Returns {:ok, struct} on success or {:error, changeset} on failure.

The function rolls forward from the event log to reconstruct current state, applies the action through the schema's changeset/2 function, and records the new event.

Example

# Using a custom :archive action
{:ok, item} = Spector.execute(item, :archive, %{archived_at: DateTime.utc_now()})

# The :update action (same as Spector.update/2)
{:ok, user} = Spector.execute(user, :update, %{name: "New Name"})

fetch_attr(attrs, key)

@spec fetch_attr(attrs(), atom()) :: {:ok, any()} | :error

Fetches a field from attrs, checking both atom and string keys.

Returns {:ok, value} if the key exists, or :error if not found.

This is useful in changesets where attrs may come with string keys (from JSON) or atom keys (from internal calls).

Example

def changeset(record, attrs) do
  case Spector.fetch_attr(attrs, :parent_id) do
    {:ok, parent_id} -> # handle parent_id
    :error -> # no parent_id provided
  end
end

fetch_attr!(attrs, key)

@spec fetch_attr!(attrs(), atom()) :: any()

Fetches a field from attrs, checking both atom and string keys.

Returns the value if the key exists, or raises KeyError if not found.

Example

def changeset(record, attrs) do
  parent_id = Spector.fetch_attr!(attrs, :parent_id)
  # use parent_id
end

get(schema, parent_id)

@spec get(evented_schema(), String.t()) :: evented_struct() | nil

Retrieve the current state of a record by replaying its events.

Returns the struct if found, or nil if no events exist for the given ID.

This is useful for embedded schemas (without database tables) or when you want to reconstruct state purely from the event log.

Example

user = Spector.get(MyApp.User, "019ac640-dfc0-7407-8238-39a9c45e8813")

get_attr(attrs, key, default \\ nil)

@spec get_attr(attrs(), atom(), any()) :: any()

Gets a field from attrs, checking both atom and string keys.

Returns the value if the key exists, or default if not found.

Example

def changeset(record, attrs) do
  parent_id = Spector.get_attr(attrs, :parent_id, nil)
  # use parent_id, which may be nil
end

insert(schema, attrs, action \\ :insert)

@spec insert(evented_schema(), attrs(), action()) ::
  {:ok, evented_struct()} | {:error, Ecto.Changeset.t()}

Insert a new record, creating an event in the event log.

Returns {:ok, struct} on success or {:error, changeset} on failure.

Example

{:ok, user} = Spector.insert(MyApp.User, %{name: "Alice", email: "alice@example.com"})

The inserted struct will have a new UUIDv7 id assigned.

previous_events(event)

Returns all events up to and including the given event.

Events are returned in insertion order. The specified event is included as the last element in the result.

Note

This function does not take into account savepoints.

Example

events = Spector.previous_events(event)

recent_events(schema, parent_id)

@spec recent_events(evented_schema(), Ecto.UUID.t()) :: [struct()]

Returns events starting from the most recent savepoint.

If no savepoint exists, returns all events from the beginning. The savepoint event itself is included as the first element.

Events are returned in insertion order.

Use of this function over all_events/2 is preferable; if the schema does not support savepoints, the two functions behave identically.

Example

events = Spector.recent_events(MyApp.User, user_id)

savepoint(record)

Create a savepoint event capturing the current state of a record.

Returns {:ok, struct} on success or raises on failure.

Savepoints store the complete state of a record at a point in time, allowing event replay to start from the savepoint instead of replaying all events from the beginning. This is useful for records with long event histories.

The schema must implement the savepoint/2 callback to define how the current state is converted to an attrs map:

@behaviour Spector.Evented

@impl true
def savepoint(record, _version) do
  %{name: record.name, email: record.email}
end

Forms

There are two ways to create a savepoint:

From a record (savepoint/1)

Pass the record directly. This verifies that the record matches the current state in the event log (replayed events must produce the same field values). This guards against creating savepoints from stale records:

{:ok, user} = Spector.savepoint(user)

If the record is stale (e.g., another process updated it), this raises an error.

From schema and ID (savepoint/2)

Pass the schema module and record ID. This replays events to determine current state without verification:

{:ok, user} = Spector.savepoint(MyApp.User, user_id)

Use this form when you don't have the record in memory or don't need stale record detection.

savepoint(schema, id)

update(record, attrs)

@spec update(evented_struct(), attrs()) ::
  {:ok, evented_struct()} | {:error, Ecto.Changeset.t()}

Update an existing record, creating an event in the event log.

This is a convenience function that calls execute(record, :update, attrs).

Returns {:ok, struct} on success or {:error, changeset} on failure.

Example

{:ok, user} = Spector.update(user, %{name: "Alice Smith"})