For Agents
View SourceA step-by-step implementation guide for AI coding agents integrating Forja into an Elixir application.
Prerequisites checklist
Before starting, verify the target application has:
- [ ] Elixir 1.19+ (
elixir --version) - [ ] PostgreSQL as the database
- [ ] An
Ecto.Repomodule configured and working - [ ]
Phoenix.PubSubin the supervision tree - [ ]
Obaninstalled and configured (dependency + supervision tree)
If Oban is not yet installed, add it first:
# mix.exs
{:oban, "~> 2.18"}# config/config.exs
config :my_app, Oban,
repo: MyApp.Repo,
queues: [default: 10]# application.ex children list (after Repo)
{Oban, Application.fetch_env!(:my_app, Oban)}Step 1: Add the dependency
Add forja to mix.exs:
def deps do
[
{:forja, "~> 0.3.0"}
]
endRun mix deps.get.
Step 2: Generate the migration
Option A -- Igniter (preferred if available):
mix igniter.install forja
This handles steps 2-4 automatically. Skip to Step 5 if using Igniter.
Option B -- Manual:
mix forja.install
mix ecto.migrate
This creates the forja_events table with the following columns: id (UUID), type (string), payload (map), meta (map), source (string), processed_at (utc_datetime_usec), idempotency_key (string), reconciliation_attempts (integer), inserted_at (utc_datetime_usec).
Verification: Confirm the migration ran successfully and the forja_events table exists.
Step 3: Configure Oban queues
Add the Forja queues to the existing Oban configuration in config/config.exs:
config :my_app, Oban,
repo: MyApp.Repo,
queues: [
default: 10,
forja_events: 5,
forja_reconciliation: 1
],
plugins: [
{Oban.Plugins.Cron, crontab: [
{"0 * * * *", Forja.Workers.ReconciliationWorker,
args: %{forja_name: "my_app"}}
]}
]Important rules:
- Merge
forja_eventsandforja_reconciliationinto the existing:queueslist -- do not overwrite other queues. - The
forja_namearg in the crontab must match the:nameatom you will use in Step 4 (as a string). If the Forja name is:my_app, the arg is"my_app". - If
Oban.Plugins.Cronis already configured, merge the new crontab entry into the existing list. - The reconciliation crontab is optional but recommended for production.
Verification: Run mix compile to check for syntax errors in config files.
Step 4: Add Forja to the supervision tree
In application.ex, add Forja to the children list after Repo, PubSub, and Oban:
children = [
MyApp.Repo,
{Phoenix.PubSub, name: MyApp.PubSub},
{Oban, Application.fetch_env!(:my_app, Oban)},
{Forja,
name: :my_app,
repo: MyApp.Repo,
pubsub: MyApp.PubSub,
handlers: []}
]Important rules:
- The ordering matters. Forja depends on Repo, PubSub, and Oban being started first.
:nameis an atom identifier -- use the application name by convention.:repomust be the actual Repo module, not a string.:pubsubmust be the PubSub module registered in the supervision tree.- Start with
handlers: []-- you will add handlers after creating them.
Verification: Run mix compile. Start the app with iex -S mix and confirm no startup errors.
Step 5: Create an event handler
Create a handler module implementing the Forja.Handler behaviour:
defmodule MyApp.Events.SomeHandler do
@behaviour Forja.Handler
@impl Forja.Handler
def event_types, do: ["some:event_type"]
@impl Forja.Handler
def handle_event(%Forja.Event{type: "some:event_type"} = event, _meta) do
# Process the event here
:ok
end
endRules for handlers:
- Must return
:okor{:error, reason}-- any other return value is treated as an error. - Must be idempotent -- the same event may be delivered more than once in edge cases (e.g., Oban retry after mark_processed failure). Design handlers so that processing the same event twice produces the same result.
- Do not perform long-running side effects inline -- for operations that can fail independently (sending emails, calling external APIs), enqueue a separate Oban job from within the handler rather than doing it inline.
- Use pattern matching on event type -- if the handler subscribes to multiple event types, use function clause pattern matching on
event.type. - Event types use
"namespace:action"convention -- e.g.,"order:created","user:registered","payment:refunded". - Use
:allto handle every event -- return:allfromevent_types/0for catch-all handlers like audit loggers. - Payload is a map with string keys -- always access payload fields with string keys:
event.payload["field"], notevent.payload.fieldorevent.payload[:field].
Handler file location
Place handler modules in a context-appropriate namespace:
lib/my_app/events/order_notifier.ex
lib/my_app/events/analytics_tracker.ex
lib/my_app/events/audit_logger.exRegister the handler
Add the handler module to the :handlers list in the Forja supervision config:
{Forja,
name: :my_app,
repo: MyApp.Repo,
pubsub: MyApp.PubSub,
handlers: [MyApp.Events.SomeHandler]}Verification: Run mix compile to confirm the handler module is valid.
Step 6: Emit events
Simple emission
Forja.emit(:my_app, MyApp.Events.OrderCreated,
payload: %{"order_id" => order.id, "total" => order.total},
source: "orders"
)Return values:
{:ok, %Forja.Event{}}-- event emitted successfully{:ok, :already_processed}-- idempotency key exists and event was already processed{:ok, :retrying, event_id}-- idempotency key exists but event hasn't been processed yet (re-enqueued)
Transactional emission with Ecto.Multi
When the event must be atomic with a domain operation:
Ecto.Multi.new()
|> Ecto.Multi.insert(:order, Order.changeset(%Order{}, attrs))
|> Forja.emit_multi(:my_app, MyApp.Events.OrderCreated,
payload_fn: fn %{order: order} ->
%{"order_id" => order.id, "total" => order.total}
end,
source: "orders"
)
|> Forja.transaction(:my_app)Important rules:
- Use
emit_multi/4when the event must succeed or fail with the domain operation. payload_fnreceives the results of all previous Multi steps.- The Oban job and event insert happen inside the same transaction.
- Call
Forja.transaction/2on the Multi -- pass the Forja name as second argument.
Idempotent emission
Use idempotency_key to prevent duplicate event processing:
Forja.emit(:my_app, "payment:received",
payload: %{"payment_id" => payment.id},
idempotency_key: "payment-#{payment.id}"
)Use idempotency keys when the same business event could trigger emit/3 more than once (e.g., webhook retries, user double-clicks).
Step 7: Add dead letter handling (optional but recommended)
Create a dead letter handler for events that exhaust all retry attempts:
defmodule MyApp.Events.DeadLetterHandler do
@behaviour Forja.DeadLetter
@impl Forja.DeadLetter
def handle_dead_letter(event, reason) do
require Logger
Logger.error("Dead letter: event #{event.id} (#{event.type}), reason: #{inspect(reason)}")
:ok
end
endRegister it in the Forja config:
{Forja,
name: :my_app,
repo: MyApp.Repo,
pubsub: MyApp.PubSub,
handlers: [MyApp.Events.SomeHandler],
dead_letter: MyApp.Events.DeadLetterHandler}Step 8: Set up telemetry (optional)
Quick setup -- default logger
Add to application.ex before the children list:
def start(_type, _args) do
Forja.Telemetry.attach_default_logger(level: :info)
children = [
# ...
]
Supervisor.start_link(children, strategy: :one_for_one)
endLevel options: :debug (everything), :info (lifecycle + problems), :warning (problems only), :error (critical only).
Custom metrics
For StatsD, Prometheus, or other metrics backends, attach to telemetry events directly:
:telemetry.attach_many(
"forja-metrics",
[
[:forja, :event, :emitted],
[:forja, :event, :processed],
[:forja, :event, :failed],
[:forja, :event, :dead_letter]
],
&MyApp.ForjaMetrics.handle_event/4,
nil
)Step 9: Write tests
Import Forja.Testing in test modules:
defmodule MyApp.OrderTest do
use MyApp.DataCase
import Forja.Testing
test "emitting order event" do
{:ok, _order} = MyApp.Orders.create_order(%{total: 5000})
assert_event_emitted(:my_app, "order:created", %{"total" => 5000})
end
test "process and verify side effects" do
Forja.emit(:my_app, "order:created", payload: %{"order_id" => "123"})
process_all_pending(:my_app)
# Assert on handler side effects after synchronous processing
end
test "handler in isolation" do
result = invoke_handler(
MyApp.Events.OrderNotifier,
"order:created",
%{"order_id" => "123"}
)
assert result == :ok
end
endAvailable test helpers:
| Helper | Purpose |
|---|---|
assert_event_emitted/3 | Verify an event was persisted (supports partial payload matching) |
refute_event_emitted/2 | Verify no event of a type was emitted |
process_all_pending/1 | Synchronously process all unprocessed events |
assert_event_deduplicated/2 | Verify idempotency key has exactly one event |
invoke_handler/4 | Call a handler directly without persistence |
Common mistakes to avoid
String keys in payload, always. Payloads are stored as JSON. Use
%{"order_id" => id}, not%{order_id: id}. Atom keys will be converted to strings on storage and break pattern matches.Don't forget to register handlers. Creating a handler module without adding it to the
:handlerslist means it will never receive events.Forja must start after its dependencies. In the supervision tree, Forja must come after Repo, PubSub, and Oban. Starting it earlier will crash.
The Forja name must be consistent. The
:nameused inForjasupervision config,Forja.emit/3,ReconciliationWorkerargs, and test helpers must all match.Handlers must be idempotent. In edge cases (e.g., Oban retry after a mark_processed failure), an event can be delivered to handlers more than once.
Don't use
Repo.transactioninsideemit/3. Theemit/3function already runs in a transaction. If you need a transaction that includes both a domain operation and event emission, useemit_multi/4.Don't put Forja in a test-only conditional. Forja should be in the supervision tree in all environments. In tests, use
Forja.Testinghelpers for synchronous processing.The
forja_namein ReconciliationWorker crontab is a string, not an atom. It must match the atom name as a string (:my_app->"my_app").
Quick reference
Full application.ex example
defmodule MyApp.Application do
use Application
@impl true
def start(_type, _args) do
Forja.Telemetry.attach_default_logger(level: :info)
children = [
MyApp.Repo,
{Phoenix.PubSub, name: MyApp.PubSub},
{Oban, Application.fetch_env!(:my_app, Oban)},
{Forja,
name: :my_app,
repo: MyApp.Repo,
pubsub: MyApp.PubSub,
handlers: [
MyApp.Events.OrderNotifier,
MyApp.Events.AuditLogger
],
dead_letter: MyApp.Events.DeadLetterHandler}
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
end
endFull config/config.exs Oban section
config :my_app, Oban,
repo: MyApp.Repo,
queues: [
default: 10,
forja_events: 5,
forja_reconciliation: 1
],
plugins: [
{Oban.Plugins.Cron, crontab: [
{"0 * * * *", Forja.Workers.ReconciliationWorker,
args: %{forja_name: "my_app"}}
]}
]Emit cheatsheet
# Simple
Forja.emit(:name, "type:action", payload: %{"key" => "value"}, source: "context")
# Idempotent
Forja.emit(:name, "type:action", payload: %{...}, idempotency_key: "unique-key")
# Transactional
Multi.new()
|> Multi.insert(:record, changeset)
|> Forja.emit_multi(:name, "type:action", payload_fn: fn %{record: r} -> %{"id" => r.id} end)
|> Forja.transaction(:name)Handler template
defmodule MyApp.Events.NameHandler do
@behaviour Forja.Handler
@impl Forja.Handler
def event_types, do: ["namespace:action"]
@impl Forja.Handler
def handle_event(%Forja.Event{type: "namespace:action"} = event, _meta) do
# Access payload with string keys: event.payload["key"]
# Return :ok or {:error, reason}
:ok
end
end