Accrue.Events (accrue v0.3.0)

Copy Markdown View Source

Append-only event ledger API (D-13, D-14, D-15, D-16).

Every state mutation in Phase 2+ emits a corresponding row in accrue_events in the SAME transaction as the mutation. This module provides two entry points:

Both paths go through the same Accrue.Events.Event.changeset/1, and both honor the idempotency guarantee: a duplicate idempotency_key collapses to the existing row via on_conflict: :nothing plus a manual fetch fallback, so webhook replays are no-ops.

Actor + trace_id auto-capture

record/1 reads Accrue.Actor.current/0 and Accrue.Telemetry.current_trace_id/0 from the process dictionary so upstream plugs (Accrue.Plug.PutActor, Phase 2) and Oban worker middleware can stamp events without the call site passing anything explicitly. Callers override either by passing :actor / :trace_id in the attrs map.

Security

⚠️ The data jsonb column is not automatically sanitized. Callers MUST NOT put payment-method PII or secrets into data. A redactor may land in Phase 6; Phase 1 deliberately accepts this risk (T-EVT-03) and documents it here.

Immutability is enforced at the Postgres layer by a BEFORE UPDATE OR DELETE trigger raising SQLSTATE 45A01. This module translates the resulting Postgrex.Error into Accrue.EventLedgerImmutableError via pattern-match on the pg_code field — never by parsing the error message string (D-11).

Summary

Functions

Buckets events by date_trunc'd inserted_at for the given filter.

Records a single event, returning {:ok, %Event{}} on success or propagating the underlying error on failure.

Appends an event insert to an Ecto.Multi pipeline. Downstream plans (Phase 2 billing context) use this to commit a state mutation and its event record in the same transaction.

Reconstructs the projected state map for a subject as of a past timestamp by folding all events with inserted_at <= ts.

Returns events scoped to a single subject, ordered by inserted_at ascending. Each row is routed through the upcaster chain to the current schema version before returning (Pitfall 10).

Types

attrs()

@type attrs() :: %{
  optional(:type) => String.t(),
  optional(:subject_type) => String.t(),
  optional(:subject_id) => String.t(),
  optional(:schema_version) => integer(),
  optional(:actor) => Accrue.Actor.t() | nil,
  optional(:actor_type) => String.t() | atom(),
  optional(:actor_id) => String.t() | nil,
  optional(:data) => map(),
  optional(:trace_id) => String.t() | nil,
  optional(:idempotency_key) => String.t() | nil,
  optional(:caused_by_event_id) => integer() | nil,
  optional(:caused_by_webhook_event_id) => Ecto.UUID.t() | nil
}

Functions

bucket_by(filter, bucket)

@spec bucket_by(
  keyword(),
  :day | :week | :month
) :: [{DateTime.t(), non_neg_integer()}]

Buckets events by date_trunc'd inserted_at for the given filter.

Returns a list of {bucket_datetime, count} tuples ordered by bucket.

Filters

  • :type — single string or list of strings
  • :since / :until — DateTime bounds
  • :subject_type — string

Bucket sizes

  • :day, :week, :month

record(attrs)

@spec record(attrs()) :: {:ok, Accrue.Events.Event.t()} | {:error, term()}

Records a single event, returning {:ok, %Event{}} on success or propagating the underlying error on failure.

Immutability violations (attempting to insert a row whose primary key collides with an existing row that the trigger then rejects on internal retry) are translated to Accrue.EventLedgerImmutableError via the Postgrex SQLSTATE 45A01 pattern-match — this is mostly defensive; record/1 itself never updates or deletes. The stronger guarantee is that Accrue.Repo.update/2 on an Event raises this error, which is what the immutability test asserts.

Examples

iex> Accrue.Events.record(%{
...>   type: "subscription.created",
...>   subject_type: "Subscription",
...>   subject_id: "sub_123"
...> })
{:ok, %Accrue.Events.Event{type: "subscription.created", ...}}

record_multi(multi, name, attrs)

@spec record_multi(Ecto.Multi.t(), atom(), attrs()) :: Ecto.Multi.t()

Appends an event insert to an Ecto.Multi pipeline. Downstream plans (Phase 2 billing context) use this to commit a state mutation and its event record in the same transaction.

Examples

Ecto.Multi.new()
|> Ecto.Multi.insert(:subscription, subscription_changeset)
|> Accrue.Events.record_multi(:event, %{
  type: "subscription.created",
  subject_type: "Subscription",
  subject_id: "sub_123"
})
|> Accrue.Repo.transact()

state_as_of(subject_type, subject_id, ts)

@spec state_as_of(String.t(), String.t(), DateTime.t()) :: %{
  state: map(),
  event_count: non_neg_integer(),
  last_event_at: DateTime.t() | nil
}

Reconstructs the projected state map for a subject as of a past timestamp by folding all events with inserted_at <= ts.

Returns a map with :state, :event_count, and :last_event_at.

Each row is routed through the upcaster chain BEFORE folding (Pitfall 10).

timeline_for(subject_type, subject_id, opts \\ [])

@spec timeline_for(String.t(), String.t(), keyword()) :: [Accrue.Events.Event.t()]

Returns events scoped to a single subject, ordered by inserted_at ascending. Each row is routed through the upcaster chain to the current schema version before returning (Pitfall 10).

Options

  • :limit — max rows to return (default 1_000)