Tamper-evident audit ledger for Accrue billing events.
Every state mutation in the billing system emits a corresponding row in
accrue_events in the same database transaction as the mutation. This
gives you an ordered, immutable record of what happened, who did it, and
when — without a separate event store.
When you reach for this module
- Recording a billing lifecycle change — call
record/1inside aRepo.transact/1block, orrecord_multi/3inside anEcto.Multipipeline. - Auditing what happened to a subscription or customer — use
timeline_for/3to fetch all events for a subject in chronological order. - Reconstructing state at a point in time — use
state_as_of/3to fold events up to a timestamp into a projected state map. - Charting event volume over time — use
bucket_by/2for dashboard aggregations (daily/weekly/monthly).
Key functions
record/1— insert a single event; use insideRepo.transact/1.record_multi/3— append an event step to anEcto.Multipipeline.timeline_for/3— list events for a subject, oldest first.state_as_of/3— reconstruct a subject's projected state at a past moment.bucket_by/2— count events by day/week/month for analytics.
Idempotency
Both record/1 and record_multi/3 accept an optional :idempotency_key.
A duplicate key collapses to the existing row via on_conflict: :nothing
plus a fallback fetch — webhook replays and Oban retries are safe no-ops.
Actor and trace ID auto-capture
record/1 reads Accrue.Actor.current/0 and
Accrue.Telemetry.current_trace_id/0 from the process dictionary so
request-scoped plugs and Oban worker middleware can stamp events without
the call site passing anything explicitly. Override either by passing
:actor or :trace_id in the attrs map.
Schema versioning (upcasting)
Each event row carries a schema_version integer. When you read events
back via timeline_for/3 or state_as_of/3, each row is automatically
migrated forward through any registered upcasters to the current schema
version. This means you can evolve what an event's data map looks like
over time without rewriting historical rows.
Immutability
Events are append-only. A PostgreSQL BEFORE UPDATE OR DELETE trigger
raises SQLSTATE 45A01 on any attempt to modify or delete an event row.
This module translates the resulting Postgrex.Error into
Accrue.EventLedgerImmutableError by pattern-matching on pg_code —
never by parsing the error message string.
Security
⚠️ The
datajsonb column is not automatically sanitized. Callers must not store payment-method PII or secrets indata.
Summary
Functions
Buckets events by date_trunc'd inserted_at for the given filter.
Records a single event, returning {:ok, %Event{}} on success or
propagating the underlying error on failure.
Appends an event insert to an Ecto.Multi pipeline, committing the state
mutation and its audit record in the same transaction.
Reconstructs the projected state map for a subject as of a past
timestamp by folding all events with inserted_at <= ts.
Returns events scoped to a single subject, ordered by inserted_at
ascending. Each row is automatically migrated through the upcaster chain
to the current schema version before being returned.
Types
@type attrs() :: %{ optional(:type) => String.t(), optional(:subject_type) => String.t(), optional(:subject_id) => String.t(), optional(:schema_version) => integer(), optional(:actor) => Accrue.Actor.t() | nil, optional(:actor_type) => String.t() | atom(), optional(:actor_id) => String.t() | nil, optional(:data) => map(), optional(:trace_id) => String.t() | nil, optional(:idempotency_key) => String.t() | nil, optional(:caused_by_event_id) => integer() | nil, optional(:caused_by_webhook_event_id) => Ecto.UUID.t() | nil }
Functions
@spec bucket_by( keyword(), :day | :week | :month ) :: [{DateTime.t(), non_neg_integer()}]
Buckets events by date_trunc'd inserted_at for the given filter.
Returns a list of {bucket_datetime, count} tuples ordered by bucket.
Filters
:type— single string or list of strings:since/:until— DateTime bounds:subject_type— string
Bucket sizes
:day,:week,:month
@spec record(attrs()) :: {:ok, Accrue.Events.Event.t()} | {:error, term()}
Records a single event, returning {:ok, %Event{}} on success or
propagating the underlying error on failure.
Immutability violations (attempting to insert a row whose primary key
collides with an existing row that the trigger then rejects on
internal retry) are translated to Accrue.EventLedgerImmutableError
via the Postgrex SQLSTATE 45A01 pattern-match — this is mostly
defensive; record/1 itself never updates or deletes. The stronger
guarantee is that Accrue.Repo.update/2 on an Event raises this
error, which is what the immutability test asserts.
Examples
iex> Accrue.Events.record(%{
...> type: "subscription.created",
...> subject_type: "Subscription",
...> subject_id: "sub_123"
...> })
{:ok, %Accrue.Events.Event{type: "subscription.created", ...}}
@spec record_multi(Ecto.Multi.t(), atom(), attrs()) :: Ecto.Multi.t()
Appends an event insert to an Ecto.Multi pipeline, committing the state
mutation and its audit record in the same transaction.
Examples
Ecto.Multi.new()
|> Ecto.Multi.insert(:subscription, subscription_changeset)
|> Accrue.Events.record_multi(:event, %{
type: "subscription.created",
subject_type: "Subscription",
subject_id: "sub_123"
})
|> Accrue.Repo.transact()
@spec state_as_of(String.t(), String.t(), DateTime.t()) :: %{ state: map(), event_count: non_neg_integer(), last_event_at: DateTime.t() | nil }
Reconstructs the projected state map for a subject as of a past
timestamp by folding all events with inserted_at <= ts.
Returns a map with :state, :event_count, and :last_event_at.
Each row is migrated through the upcaster chain before folding, so the resulting state reflects the current schema regardless of when the events were recorded.
@spec timeline_for(String.t(), String.t(), keyword()) :: [Accrue.Events.Event.t()]
Returns events scoped to a single subject, ordered by inserted_at
ascending. Each row is automatically migrated through the upcaster chain
to the current schema version before being returned.
Options
:limit— max rows to return (default1_000)