Append-only event ledger API (D-13, D-14, D-15, D-16).
Every state mutation in Phase 2+ emits a corresponding row in
accrue_events in the SAME transaction as the mutation. This module
provides two entry points:
record/1— for use insideAccrue.Repo.transact/1blocks.record_multi/3— for use insideEcto.Multipipelines.
Both paths go through the same Accrue.Events.Event.changeset/1, and
both honor the idempotency guarantee: a duplicate idempotency_key
collapses to the existing row via on_conflict: :nothing plus a
manual fetch fallback, so webhook replays are no-ops.
Actor + trace_id auto-capture
record/1 reads Accrue.Actor.current/0 and
Accrue.Telemetry.current_trace_id/0 from the process dictionary so
upstream plugs (Accrue.Plug.PutActor, Phase 2) and Oban worker
middleware can stamp events without the call site passing anything
explicitly. Callers override either by passing :actor / :trace_id
in the attrs map.
Security
⚠️ The
datajsonb column is not automatically sanitized. Callers MUST NOT put payment-method PII or secrets intodata. A redactor may land in Phase 6; Phase 1 deliberately accepts this risk (T-EVT-03) and documents it here.
Immutability is enforced at the Postgres layer by a
BEFORE UPDATE OR DELETE trigger raising SQLSTATE 45A01. This
module translates the resulting Postgrex.Error into
Accrue.EventLedgerImmutableError via pattern-match on the
pg_code field — never by parsing the error message string (D-11).
Summary
Functions
Buckets events by date_trunc'd inserted_at for the given filter.
Records a single event, returning {:ok, %Event{}} on success or
propagating the underlying error on failure.
Appends an event insert to an Ecto.Multi pipeline. Downstream plans
(Phase 2 billing context) use this to commit a state mutation and its
event record in the same transaction.
Reconstructs the projected state map for a subject as of a past
timestamp by folding all events with inserted_at <= ts.
Returns events scoped to a single subject, ordered by inserted_at
ascending. Each row is routed through the upcaster chain to the
current schema version before returning (Pitfall 10).
Types
@type attrs() :: %{ optional(:type) => String.t(), optional(:subject_type) => String.t(), optional(:subject_id) => String.t(), optional(:schema_version) => integer(), optional(:actor) => Accrue.Actor.t() | nil, optional(:actor_type) => String.t() | atom(), optional(:actor_id) => String.t() | nil, optional(:data) => map(), optional(:trace_id) => String.t() | nil, optional(:idempotency_key) => String.t() | nil, optional(:caused_by_event_id) => integer() | nil, optional(:caused_by_webhook_event_id) => Ecto.UUID.t() | nil }
Functions
@spec bucket_by( keyword(), :day | :week | :month ) :: [{DateTime.t(), non_neg_integer()}]
Buckets events by date_trunc'd inserted_at for the given filter.
Returns a list of {bucket_datetime, count} tuples ordered by bucket.
Filters
:type— single string or list of strings:since/:until— DateTime bounds:subject_type— string
Bucket sizes
:day,:week,:month
@spec record(attrs()) :: {:ok, Accrue.Events.Event.t()} | {:error, term()}
Records a single event, returning {:ok, %Event{}} on success or
propagating the underlying error on failure.
Immutability violations (attempting to insert a row whose primary key
collides with an existing row that the trigger then rejects on
internal retry) are translated to Accrue.EventLedgerImmutableError
via the Postgrex SQLSTATE 45A01 pattern-match — this is mostly
defensive; record/1 itself never updates or deletes. The stronger
guarantee is that Accrue.Repo.update/2 on an Event raises this
error, which is what the immutability test asserts.
Examples
iex> Accrue.Events.record(%{
...> type: "subscription.created",
...> subject_type: "Subscription",
...> subject_id: "sub_123"
...> })
{:ok, %Accrue.Events.Event{type: "subscription.created", ...}}
@spec record_multi(Ecto.Multi.t(), atom(), attrs()) :: Ecto.Multi.t()
Appends an event insert to an Ecto.Multi pipeline. Downstream plans
(Phase 2 billing context) use this to commit a state mutation and its
event record in the same transaction.
Examples
Ecto.Multi.new()
|> Ecto.Multi.insert(:subscription, subscription_changeset)
|> Accrue.Events.record_multi(:event, %{
type: "subscription.created",
subject_type: "Subscription",
subject_id: "sub_123"
})
|> Accrue.Repo.transact()
@spec state_as_of(String.t(), String.t(), DateTime.t()) :: %{ state: map(), event_count: non_neg_integer(), last_event_at: DateTime.t() | nil }
Reconstructs the projected state map for a subject as of a past
timestamp by folding all events with inserted_at <= ts.
Returns a map with :state, :event_count, and :last_event_at.
Each row is routed through the upcaster chain BEFORE folding (Pitfall 10).
@spec timeline_for(String.t(), String.t(), keyword()) :: [Accrue.Events.Event.t()]
Returns events scoped to a single subject, ordered by inserted_at
ascending. Each row is routed through the upcaster chain to the
current schema version before returning (Pitfall 10).
Options
:limit— max rows to return (default1_000)