Forja (Forja v0.4.0)
View SourceEvent bus with Oban-backed processing for Elixir.
Forja persists events in the database, enqueues Oban jobs for processing, and broadcasts committed events through PubSub for subscribers that want emission notifications.
Event processing relies on an Oban unique constraint plus the processed_at
safety net to prevent duplicate handling.
Usage
children = [
{Forja,
name: :my_app,
repo: MyApp.Repo,
pubsub: MyApp.PubSub,
handlers: [MyApp.Events.OrderHandler],
dead_letter: MyApp.Events.DeadLetterHandler,
reconciliation: [enabled: true, interval_minutes: 60, threshold_minutes: 15, max_retries: 3]}
]
Supervisor.start_link(children, strategy: :one_for_one)Emitting events
Forja.emit(:my_app, MyApp.Events.OrderCreated,
payload: %{order_id: order.id, amount_cents: order.total},
source: "orders"
)See Forja.Event.Schema for how to define typed event schemas.
Idempotent emission
Forja.emit(:my_app, MyApp.Events.OrderCreated,
payload: %{order_id: order.id, amount_cents: order.total},
source: "orders",
idempotency_key: "order-created-#{order.id}"
)Transactional emission
Ecto.Multi.new()
|> Ecto.Multi.insert(:order, order_changeset)
|> Forja.emit_multi(:my_app, MyApp.Events.OrderCreated,
payload_fn: fn %{order: order} -> %{order_id: order.id, amount_cents: order.total} end,
source: "orders"
)
|> Forja.transaction(:my_app)
Summary
Functions
Loads an event by ID and broadcasts it to Forja subscribers.
Returns a specification to start this module under a supervisor.
Emits an event atomically.
Adds event emission steps to an existing Ecto.Multi.
Starts the Forja instance as part of a supervision tree.
Executes an Ecto.Multi and broadcasts any emitted Forja events after commit.
Functions
Loads an event by ID and broadcasts it to Forja subscribers.
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec emit(atom(), module(), keyword()) :: {:ok, Forja.Event.t()} | {:ok, :already_processed} | {:ok, :retrying, String.t()} | {:error, Ecto.Changeset.t()} | {:error, Forja.ValidationError.t()}
Emits an event atomically.
The type argument must be a module that uses Forja.Event.Schema.
The payload is validated against the Zoi schema before persistence.
Invalid payloads return {:error, %Forja.ValidationError{}} immediately
without persisting.
Inside an Ecto.Multi transaction:
- Inserts the event into the
forja_eventstable - Inserts the Oban
ProcessEventWorkerjob
After the commit:
- Broadcasts via PubSub
Options
:payload- Map with event data (default:%{}):meta- Map with metadata (default:%{}):source- String identifying the origin (default:nil):idempotency_key- Optional string key for deduplication (default:nil):correlation_id- UUID grouping related events; auto-generated if not provided:causation_id- UUID of the event that caused this event (default:nil)
Schema-based emission
Forja.emit(:my_app, MyApp.Events.OrderCreated,
payload: %{user_id: "uuid-123", amount_cents: 500},
source: "checkout"
)Idempotency
When :idempotency_key is provided, the function checks for an existing event
with the same key before inserting:
- If found with
processed_atset: returns{:ok, :already_processed} - If found with
processed_at: nil: re-enqueues the Oban job and returns{:ok, :retrying, existing_event_id} - If not found: emits normally
@spec emit_multi(Ecto.Multi.t(), atom(), module(), keyword()) :: Ecto.Multi.t()
Adds event emission steps to an existing Ecto.Multi.
Like emit/3, the type argument must be a module that uses
Forja.Event.Schema. The payload is validated inside the Multi step.
If validation fails, the transaction is rolled back with
{:error, step_key, %Forja.ValidationError{}, changes}.
The caller is responsible for executing the transaction on the returned Multi.
Use Forja.transaction/2 when you want emitted events in the Multi to be
broadcast after a successful commit:
Ecto.Multi.new()
|> Ecto.Multi.insert(:order, order_changeset)
|> Forja.emit_multi(:my_app, MyApp.Events.OrderCreated,
payload_fn: fn %{order: order} -> %{order_id: order.id, amount_cents: order.total} end,
source: "orders"
)
|> Forja.transaction(:my_app)Options
:payload_fn- Function(map() -> map())that receives the previous Multi results and returns the payload (required if the payload depends on previous results):payload- Static map with event data (alternative to:payload_fn):meta- Map with metadata (default:%{}):source- String identifying the origin (default:nil):idempotency_key- Optional string key for deduplication (default:nil):correlation_id- UUID grouping related events; auto-generated if not provided:causation_id- UUID of the event that caused this event (default:nil)
Idempotency
When :idempotency_key is provided and an event with the same key already exists,
the Multi step short-circuits:
- If found with
processed_atset: returns{:ok, :already_processed} - If found with
processed_at: nil: re-enqueues the Oban job and returns{:ok, {:retrying, existing_event_id}} - If not found: inserts normally
Example
Ecto.Multi.new()
|> Ecto.Multi.insert(:order, order_changeset)
|> Forja.emit_multi(:billing, MyApp.Events.OrderCreated,
payload_fn: fn %{order: order} -> %{order_id: order.id, amount_cents: order.total} end,
source: "orders",
idempotency_key: "order-created-#{order_ref}"
)
|> Repo.transaction()
@spec start_link(keyword()) :: Supervisor.on_start()
Starts the Forja instance as part of a supervision tree.
Options
:name- Atom identifier for the instance (required):repo- Ecto.Repo module (required):pubsub- Phoenix.PubSub module (required):oban_name- Oban instance name (default:Oban):default_queue- Default Oban queue for event jobs (default::events):event_topic_prefix- Prefix for PubSub topics (default:"forja"):handlers- List ofForja.Handlermodules (default:[]):dead_letter- Module implementingForja.DeadLetter(default:nil):reconciliation- Keyword list for reconciliation settings (default: seeForja.Config)
Executes an Ecto.Multi and broadcasts any emitted Forja events after commit.