Forja (Forja v0.4.0)

View Source

Event bus with Oban-backed processing for Elixir.

Forja persists events in the database, enqueues Oban jobs for processing, and broadcasts committed events through PubSub for subscribers that want emission notifications.

Event processing relies on an Oban unique constraint plus the processed_at safety net to prevent duplicate handling.

Usage

children = [
  {Forja,
   name: :my_app,
   repo: MyApp.Repo,
   pubsub: MyApp.PubSub,
   handlers: [MyApp.Events.OrderHandler],
   dead_letter: MyApp.Events.DeadLetterHandler,
   reconciliation: [enabled: true, interval_minutes: 60, threshold_minutes: 15, max_retries: 3]}
]

Supervisor.start_link(children, strategy: :one_for_one)

Emitting events

Forja.emit(:my_app, MyApp.Events.OrderCreated,
  payload: %{order_id: order.id, amount_cents: order.total},
  source: "orders"
)

See Forja.Event.Schema for how to define typed event schemas.

Idempotent emission

Forja.emit(:my_app, MyApp.Events.OrderCreated,
  payload: %{order_id: order.id, amount_cents: order.total},
  source: "orders",
  idempotency_key: "order-created-#{order.id}"
)

Transactional emission

Ecto.Multi.new()
|> Ecto.Multi.insert(:order, order_changeset)
|> Forja.emit_multi(:my_app, MyApp.Events.OrderCreated,
  payload_fn: fn %{order: order} -> %{order_id: order.id, amount_cents: order.total} end,
  source: "orders"
)
|> Forja.transaction(:my_app)

Summary

Functions

Loads an event by ID and broadcasts it to Forja subscribers.

Returns a specification to start this module under a supervisor.

Emits an event atomically.

Adds event emission steps to an existing Ecto.Multi.

Starts the Forja instance as part of a supervision tree.

Executes an Ecto.Multi and broadcasts any emitted Forja events after commit.

Functions

broadcast_event(name, event_id)

@spec broadcast_event(atom(), String.t()) :: :ok | {:error, :not_found}

Loads an event by ID and broadcasts it to Forja subscribers.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

emit(name, type, opts \\ [])

@spec emit(atom(), module(), keyword()) ::
  {:ok, Forja.Event.t()}
  | {:ok, :already_processed}
  | {:ok, :retrying, String.t()}
  | {:error, Ecto.Changeset.t()}
  | {:error, Forja.ValidationError.t()}

Emits an event atomically.

The type argument must be a module that uses Forja.Event.Schema. The payload is validated against the Zoi schema before persistence. Invalid payloads return {:error, %Forja.ValidationError{}} immediately without persisting.

Inside an Ecto.Multi transaction:

  1. Inserts the event into the forja_events table
  2. Inserts the Oban ProcessEventWorker job

After the commit:

  1. Broadcasts via PubSub

Options

  • :payload - Map with event data (default: %{})
  • :meta - Map with metadata (default: %{})
  • :source - String identifying the origin (default: nil)
  • :idempotency_key - Optional string key for deduplication (default: nil)
  • :correlation_id - UUID grouping related events; auto-generated if not provided
  • :causation_id - UUID of the event that caused this event (default: nil)

Schema-based emission

Forja.emit(:my_app, MyApp.Events.OrderCreated,
  payload: %{user_id: "uuid-123", amount_cents: 500},
  source: "checkout"
)

Idempotency

When :idempotency_key is provided, the function checks for an existing event with the same key before inserting:

  • If found with processed_at set: returns {:ok, :already_processed}
  • If found with processed_at: nil: re-enqueues the Oban job and returns {:ok, :retrying, existing_event_id}
  • If not found: emits normally

emit_multi(multi, name, type, opts \\ [])

@spec emit_multi(Ecto.Multi.t(), atom(), module(), keyword()) :: Ecto.Multi.t()

Adds event emission steps to an existing Ecto.Multi.

Like emit/3, the type argument must be a module that uses Forja.Event.Schema. The payload is validated inside the Multi step. If validation fails, the transaction is rolled back with {:error, step_key, %Forja.ValidationError{}, changes}.

The caller is responsible for executing the transaction on the returned Multi. Use Forja.transaction/2 when you want emitted events in the Multi to be broadcast after a successful commit:

Ecto.Multi.new()
|> Ecto.Multi.insert(:order, order_changeset)
|> Forja.emit_multi(:my_app, MyApp.Events.OrderCreated,
  payload_fn: fn %{order: order} -> %{order_id: order.id, amount_cents: order.total} end,
  source: "orders"
)
|> Forja.transaction(:my_app)

Options

  • :payload_fn - Function (map() -> map()) that receives the previous Multi results and returns the payload (required if the payload depends on previous results)
  • :payload - Static map with event data (alternative to :payload_fn)
  • :meta - Map with metadata (default: %{})
  • :source - String identifying the origin (default: nil)
  • :idempotency_key - Optional string key for deduplication (default: nil)
  • :correlation_id - UUID grouping related events; auto-generated if not provided
  • :causation_id - UUID of the event that caused this event (default: nil)

Idempotency

When :idempotency_key is provided and an event with the same key already exists, the Multi step short-circuits:

  • If found with processed_at set: returns {:ok, :already_processed}
  • If found with processed_at: nil: re-enqueues the Oban job and returns {:ok, {:retrying, existing_event_id}}
  • If not found: inserts normally

Example

Ecto.Multi.new()
|> Ecto.Multi.insert(:order, order_changeset)
|> Forja.emit_multi(:billing, MyApp.Events.OrderCreated,
  payload_fn: fn %{order: order} -> %{order_id: order.id, amount_cents: order.total} end,
  source: "orders",
  idempotency_key: "order-created-#{order_ref}"
)
|> Repo.transaction()

start_link(opts)

@spec start_link(keyword()) :: Supervisor.on_start()

Starts the Forja instance as part of a supervision tree.

Options

  • :name - Atom identifier for the instance (required)
  • :repo - Ecto.Repo module (required)
  • :pubsub - Phoenix.PubSub module (required)
  • :oban_name - Oban instance name (default: Oban)
  • :default_queue - Default Oban queue for event jobs (default: :events)
  • :event_topic_prefix - Prefix for PubSub topics (default: "forja")
  • :handlers - List of Forja.Handler modules (default: [])
  • :dead_letter - Module implementing Forja.DeadLetter (default: nil)
  • :reconciliation - Keyword list for reconciliation settings (default: see Forja.Config)

transaction(multi, name)

@spec transaction(Ecto.Multi.t(), atom()) ::
  {:ok, map()} | {:error, atom(), term(), map()}

Executes an Ecto.Multi and broadcasts any emitted Forja events after commit.