Accrue.Events (accrue v1.0.0)

Copy Markdown View Source

Tamper-evident audit ledger for Accrue billing events.

Every state mutation in the billing system emits a corresponding row in accrue_events in the same database transaction as the mutation. This gives you an ordered, immutable record of what happened, who did it, and when — without a separate event store.

When you reach for this module

  • Recording a billing lifecycle change — call record/1 inside a Repo.transact/1 block, or record_multi/3 inside an Ecto.Multi pipeline.
  • Auditing what happened to a subscription or customer — use timeline_for/3 to fetch all events for a subject in chronological order.
  • Reconstructing state at a point in time — use state_as_of/3 to fold events up to a timestamp into a projected state map.
  • Charting event volume over time — use bucket_by/2 for dashboard aggregations (daily/weekly/monthly).

Key functions

Idempotency

Both record/1 and record_multi/3 accept an optional :idempotency_key. A duplicate key collapses to the existing row via on_conflict: :nothing plus a fallback fetch — webhook replays and Oban retries are safe no-ops.

Actor and trace ID auto-capture

record/1 reads Accrue.Actor.current/0 and Accrue.Telemetry.current_trace_id/0 from the process dictionary so request-scoped plugs and Oban worker middleware can stamp events without the call site passing anything explicitly. Override either by passing :actor or :trace_id in the attrs map.

Schema versioning (upcasting)

Each event row carries a schema_version integer. When you read events back via timeline_for/3 or state_as_of/3, each row is automatically migrated forward through any registered upcasters to the current schema version. This means you can evolve what an event's data map looks like over time without rewriting historical rows.

Immutability

Events are append-only. A PostgreSQL BEFORE UPDATE OR DELETE trigger raises SQLSTATE 45A01 on any attempt to modify or delete an event row. This module translates the resulting Postgrex.Error into Accrue.EventLedgerImmutableError by pattern-matching on pg_code — never by parsing the error message string.

Security

⚠️ The data jsonb column is not automatically sanitized. Callers must not store payment-method PII or secrets in data.

Summary

Functions

Buckets events by date_trunc'd inserted_at for the given filter.

Records a single event, returning {:ok, %Event{}} on success or propagating the underlying error on failure.

Appends an event insert to an Ecto.Multi pipeline, committing the state mutation and its audit record in the same transaction.

Reconstructs the projected state map for a subject as of a past timestamp by folding all events with inserted_at <= ts.

Returns events scoped to a single subject, ordered by inserted_at ascending. Each row is automatically migrated through the upcaster chain to the current schema version before being returned.

Types

attrs()

@type attrs() :: %{
  optional(:type) => String.t(),
  optional(:subject_type) => String.t(),
  optional(:subject_id) => String.t(),
  optional(:schema_version) => integer(),
  optional(:actor) => Accrue.Actor.t() | nil,
  optional(:actor_type) => String.t() | atom(),
  optional(:actor_id) => String.t() | nil,
  optional(:data) => map(),
  optional(:trace_id) => String.t() | nil,
  optional(:idempotency_key) => String.t() | nil,
  optional(:caused_by_event_id) => integer() | nil,
  optional(:caused_by_webhook_event_id) => Ecto.UUID.t() | nil
}

Functions

bucket_by(filter, bucket)

@spec bucket_by(
  keyword(),
  :day | :week | :month
) :: [{DateTime.t(), non_neg_integer()}]

Buckets events by date_trunc'd inserted_at for the given filter.

Returns a list of {bucket_datetime, count} tuples ordered by bucket.

Filters

  • :type — single string or list of strings
  • :since / :until — DateTime bounds
  • :subject_type — string

Bucket sizes

  • :day, :week, :month

record(attrs)

@spec record(attrs()) :: {:ok, Accrue.Events.Event.t()} | {:error, term()}

Records a single event, returning {:ok, %Event{}} on success or propagating the underlying error on failure.

Immutability violations (attempting to insert a row whose primary key collides with an existing row that the trigger then rejects on internal retry) are translated to Accrue.EventLedgerImmutableError via the Postgrex SQLSTATE 45A01 pattern-match — this is mostly defensive; record/1 itself never updates or deletes. The stronger guarantee is that Accrue.Repo.update/2 on an Event raises this error, which is what the immutability test asserts.

Examples

iex> Accrue.Events.record(%{
...>   type: "subscription.created",
...>   subject_type: "Subscription",
...>   subject_id: "sub_123"
...> })
{:ok, %Accrue.Events.Event{type: "subscription.created", ...}}

record_multi(multi, name, attrs)

@spec record_multi(Ecto.Multi.t(), atom(), attrs()) :: Ecto.Multi.t()

Appends an event insert to an Ecto.Multi pipeline, committing the state mutation and its audit record in the same transaction.

Examples

Ecto.Multi.new()
|> Ecto.Multi.insert(:subscription, subscription_changeset)
|> Accrue.Events.record_multi(:event, %{
  type: "subscription.created",
  subject_type: "Subscription",
  subject_id: "sub_123"
})
|> Accrue.Repo.transact()

state_as_of(subject_type, subject_id, ts)

@spec state_as_of(String.t(), String.t(), DateTime.t()) :: %{
  state: map(),
  event_count: non_neg_integer(),
  last_event_at: DateTime.t() | nil
}

Reconstructs the projected state map for a subject as of a past timestamp by folding all events with inserted_at <= ts.

Returns a map with :state, :event_count, and :last_event_at.

Each row is migrated through the upcaster chain before folding, so the resulting state reflects the current schema regardless of when the events were recorded.

timeline_for(subject_type, subject_id, opts \\ [])

@spec timeline_for(String.t(), String.t(), keyword()) :: [Accrue.Events.Event.t()]

Returns events scoped to a single subject, ordered by inserted_at ascending. Each row is automatically migrated through the upcaster chain to the current schema version before being returned.

Options

  • :limit — max rows to return (default 1_000)