Architecture
View SourceForja uses Oban as the sole processing engine. Events are persisted and enqueued atomically, with PubSub broadcasts for real-time notifications. This guide explains the internal architecture.
Event lifecycle
When you call Forja.emit/3, two things happen inside a single database transaction:
- The event is inserted into the
forja_eventstable - An Oban job (
ProcessEventWorker) is enqueued
After the transaction commits:
- A PubSub broadcast sends
{:forja_event_emitted, %Event{}}(best-effort,try/rescueprotected)
+-------------------+
| Forja.emit/3 |
+--------+----------+
|
INSERT event + Oban job
(single Ecto.Multi transaction)
|
+--------------+--------------+
| |
PubSub broadcast Oban polls
(best-effort notify) |
| +-------v--------+
v | ProcessEvent |
{:forja_event_emitted, | Worker |
%Event{}} +-------+--------+
|
+-------v--------+
| Processor |
| (shared core) |
+-------+--------+
|
v
{:forja_event_processed,
%Event{}}PubSub is notification-only. It is not a processing trigger and never affects delivery guarantees.
Idempotent processing
Two layers prevent duplicate processing:
1. Oban unique constraint
ProcessEventWorker uses unique: [keys: [:event_id], period: 900] to prevent duplicate jobs for the same event within a 15-minute window.
2. processed_at safety net
The Processor loads the event and checks processed_at before dispatching handlers. If the event was already processed, it returns early. After all handlers run, the Processor marks the event via a non-bang Repo.update/1.
Supervision tree
Forja.Supervisor (strategy: :one_for_one)
+-- Forja.ObanListenerObanListener
A GenServer that attaches a telemetry handler to [:oban, :job, :stop]. When a ProcessEventWorker job is discarded by Oban (all retry attempts exhausted), it triggers the dead letter flow.
The telemetry callback resolves the listener by its registered name (not PID) to survive process restarts.
Oban workers
Two Oban workers run outside the supervision tree (managed by Oban's own supervisor):
ProcessEventWorker
- Queue:
:forja_events(default), or per-event via thequeuemacro inEvent.Schema - Max attempts: 3
- Unique: by
event_id, 900-second period - Calls
Forja.Processor.process/3with the:obanpath
ReconciliationWorker
- Queue:
:forja_reconciliation - Max attempts: 1 (cron job, retries handled internally)
- Runs periodically via Oban crontab
- Skips events that have active Oban jobs before attempting processing
Scans for events where processed_at IS NULL and inserted_at exceeds the threshold. For each stale event, attempts processing via the Processor. Tracks reconciliation_attempts and triggers dead letter handling when the limit is reached.
Per-event queue routing
Event schema modules can declare a custom Oban queue using the :queue option. Forja prefixes the name with forja_ internally:
defmodule MyApp.Events.PaymentReceived do
use Forja.Event.Schema,
event_type: "payment:received",
queue: :payments # routes to :forja_payments
payload do
field :amount_cents, Zoi.integer() |> Zoi.positive()
end
endEvents without a queue declaration use the instance's :default_queue (defaults to :events, i.e. :forja_events).
Transactional emission
Forja.transaction/2 wraps an Ecto.Multi that contains emit_multi steps. After a successful commit, it automatically broadcasts {:forja_event_emitted, %Event{}} for each emitted event:
Ecto.Multi.new()
|> Ecto.Multi.insert(:order, order_changeset)
|> Forja.emit_multi(:my_app, MyApp.Events.OrderCreated,
payload_fn: fn %{order: order} -> %{order_id: order.id} end,
source: "orders"
)
|> Forja.transaction(:my_app)Forja.broadcast_event/2 allows manually broadcasting a previously emitted event by its ID.
Dead letter handling
An event becomes a "dead letter" when all processing attempts are exhausted. This can happen through two paths:
- Oban discards the job -- The
ObanListenerdetects this via telemetry and emits[:forja, :event, :dead_letter] - Reconciliation exhausts retries -- The
ReconciliationWorkeremits[:forja, :event, :abandoned]
In both cases, if a Forja.DeadLetter module is configured, its handle_dead_letter/2 callback is invoked.
Handler dispatch
The Processor dispatches events to all matching handlers via the Registry. Handlers are matched by event type, and catch-all handlers (returning :all from event_types/0) receive every event.
Key behavior: all handlers run regardless of individual failures. If a handler returns {:error, reason} or raises an exception, the error is logged and telemetry is emitted, but remaining handlers still execute. The event is marked as processed after all handlers have been called.
This design means handlers should be idempotent and should enqueue their own retry mechanism for operations that may fail (e.g., sending an email via a separate Oban job). Handlers that define on_failure/3 receive a callback on error or exception.