Forja.Event (Forja v0.4.0)

View Source

Ecto schema for events persisted in the forja_events table.

Events are the core data model in Forja — they represent domain events that flow through the dual-path processing pipeline (PubSub + Oban guarantees).

Fields

  • id - UUID binary primary key (auto-generated)
  • type - String identifier for the event type (required)
  • payload - Map containing the event data (default: %{})
  • meta - Map containing event metadata (default: %{})
  • source - String identifying the event source
  • processed_at - UTC datetime when the event was processed
  • idempotency_key - Unique key for idempotent event handling
  • reconciliation_attempts - Number of reconciliation attempts (default: 0)
  • schema_version - Integer identifying the event schema version (default: 1)
  • correlation_id - UUID grouping all events in the same logical transaction
  • causation_id - UUID of the event that directly caused this event
  • inserted_at - UTC datetime when the event was created (auto-managed)

Examples

iex> %Forja.Event{type: "user.created", payload: %{"user_id" => 123}}
%Forja.Event{type: "user.created", payload: %{"user_id" => 123}, ...}

Summary

Functions

Creates a changeset for the given event with the given attributes.

Creates a changeset that increments the reconciliation_attempts counter.

Creates a changeset that marks the event as processed by setting processed_at.

Types

t()

@type t() :: %Forja.Event{
  __meta__: term(),
  causation_id: binary() | nil,
  correlation_id: binary() | nil,
  id: binary(),
  idempotency_key: String.t() | nil,
  inserted_at: DateTime.t(),
  meta: map(),
  payload: map(),
  processed_at: DateTime.t() | nil,
  reconciliation_attempts: integer(),
  schema_version: pos_integer(),
  source: String.t() | nil,
  type: String.t() | nil
}

Functions

changeset(event, attrs)

@spec changeset(t() | Ecto.Schema.t(), map()) :: Ecto.Changeset.t()

Creates a changeset for the given event with the given attributes.

Required fields: :type Optional fields: :payload, :meta, :source, :idempotency_key

Examples

iex> changeset(%Forja.Event{}, %{type: "user.created"})
#Ecto.Changeset<...>

iex> changeset(%Forja.Event{}, %{type: ""})
#Ecto.Changeset<errors: [type: {"can't be blank", ...}]>

increment_reconciliation_changeset(event)

@spec increment_reconciliation_changeset(t() | Ecto.Schema.t()) :: Ecto.Changeset.t()

Creates a changeset that increments the reconciliation_attempts counter.

Examples

iex> event = %Forja.Event{reconciliation_attempts: 0}
iex> mark_processed_changeset(event)
#Ecto.Changeset<changes: %{reconciliation_attempts: 1}>

mark_processed_changeset(event)

@spec mark_processed_changeset(t() | Ecto.Schema.t()) :: Ecto.Changeset.t()

Creates a changeset that marks the event as processed by setting processed_at.

Examples

iex> event = %Forja.Event{type: "user.created"}
iex> mark_processed_changeset(event)
#Ecto.Changeset<changes: %{processed_at: ~U[2024-01-01 00:00:00Z]}>