View Source Carbonite (Carbonite v0.12.0)

Carbonite makes it easy to create audit trails for tables in a PostgreSQL database and integrate them into an Elixir application.

Carbonite implements the Change-Data-Capture pattern on top of a PostgreSQL database. It uses triggers to automatically record all changes applied to a database table in order to guarantee a complete audit trail of the contained data.

It is centered around the idea that the database transaction is the natural auditing unit of data mutation. Any mutation on a participating table requires the developer first to record the operation metadata within the same database transaction. The metadata record is associated to the table change records by a constraint.

On top of its database layer, Carbonite provides an API allowing developers to effortlessly retrieve, process, and purge the audit trails within the Elixir application.

How it works

Carbonite keeps a central changes table where all mutations of participating tables are recorded. On each such table a trigger is installed (after INSERT, UPDATE, and DELETE statements - TRUNCATE is not supported, see below) which calls a procedure stored within the database. This procedure captures the new or updated data within the changes table automatically. The procedure fetches its own per-table settings from another table called triggers. These settings customize the procedure's behaviour, for instance, they may exclude certain table columns from being captured.

Besides the changes table, storing information about individual database statements (or rather, their impact on the data), Carbonite's transactions table stores information on database transactions. Within your application logic, you can insert a row into this table after you begin a database transaction, and record arbitrary metadata for it.

Apart from the metadata, the transactions table houses two identifier columns: id and xact_id. While id is an ordinary, autoincrementing integer primary key, the xact_id is set to pg_current_xact_id(), PostgreSQL's internal transaction identifier. Each row in the changes table is associated to a single row in the transactions table, referencing both these identifiers.

  • The changes.transaction_id (referencing transactions.id) field is filled with the "current value" of the related sequence. This is your regular foreign key column, enough to relate a changes record with exactly one transactions record. Consistency with the transactions table is guaranteed by a foreign key constraint.
  • Additionally, the changes.transaction_xact_id field is set to pg_current_xact_id(). Together these references ensure that, not only relates each changes record to a single transactions record, but records in these tables have been inserted within the same database transaction. Consistency is ensure by a manual lookup in the trigger procedure.

This leads to the following interesting properties:

  • All changes created within a database transaction automatically and implicitly belong to the same record in the transactions table, even if they're created separately and agnostic of each other in the application logic. This gives the developer a "natural" way to group related changes into events (more on events later).
  • The entry in the transactions table must be created before any changes. Attempting to modify an audited table without prior insertion into the transactions table will result in an error.

ℹ️  Trigger vs. Write-Ahead-Log / Logical Decoding / Extensions

Existing solutions for CDC on top of a PostgreSQL database (e.g. Debezium) often tail the Write-Ahead-Log or use extensions (e.g. pgaudit) instead of using triggers to create change records. Both approaches have their own advantages and disadvantages, and one should try to understand the differences before making up their mind.

  • One of the main selling points of triggers is that they are executed as part of the normal transaction logic in the database, hence they benefit from the usual atomicity and consistency guarantees of relational databases. For audit triggers, among other things this means that data mutations and their audit trail are committed as an atomic unit.
  • At the same time, this property of triggers strongly couples the auditing logic to the business operation. For instance, if the audit trigger raises an error, the entire transaction is aborted. And vice versa, if the business operation aborts, nothing is audited.
  • Additionally, auditing unavoidably has an impact on the database performance. A common criticism of audit triggers is the performance penalty they incur on otherwise "simple" database operations, which is said to be much higher than just tailing the WAL. Carbonite tries to limit the work done in its trigger, but a few indexes and tables are nonetheless touched.
  • Another advantage of triggers is their universality: You should be able to run Carbonite's migrations on any hosted PostgreSQL instance without the need to tweak its configuration or install custom extensions before.
  • The primary advantage of triggers for Carbonite, though, is that we can make use of PostgreSQL's internal transaction id: It allows us to group related changes together and record metadata for the group. This and the simple tooling around it is not (as easily) achievable with a fully decoupled system. However, if you do not need this feature of Carbonite, please be sure to consider other solutions as well.

ℹ️  Why TRUNCATE is not supported

As the last of the 4 primary data mutating SQL commands, TRUNCATE allows to delete all data from a table or a set of tables. It can be instrumented using triggers on the statement level (FOR EACH STATEMENT instead of FOR EACH ROW), which means the trigger procedure executes only once for the statement and without any data - in contrast to, for instance, an UPDATE statement, which also mutates multiple rows but fires the procedure once for each row. While there might be value in auditing TRUNCATE statements, the behaviour of the trigger procedure would be quite different from the other commands. For a rarely used SQL command, we chose against this additional complexity and not to support TRUNCATE in Carbonite.

Installation

Requirements

Due to its use of pg_current_xact_id, Carbonite requires PostgreSQL version 13 or above. If you see an error message like the following, your PostgreSQL installation is too old:

** (Postgrex.Error) ERROR 42704 (undefined_object) type "xid8" does not exist

Hex dependency

# mix.exs
def deps do
  [
    {:carbonite, "~> 0.12.0"}
  ]
end

Getting started

As much of Carbonite's logic lives in database functions and triggers, to get started, we first need to create a migration using Ecto.

Creating the initial migration

Carbonite contains a Mix task that generates the initial migration for you. Please open the generated file and edit it according to your needs.

See Carbonite.Migrations for more information on migrations.

mix carbonite.gen.initial_migration -r MyApp.Repo

The final migration should look something like this:

# priv/repo/migrations/20240404201534_install_carbonite.exs
defmodule MyApp.Repo.Migrations.InstallCarbonite do
  use Ecto.Migration

  def up do
    Carbonite.Migrations.up(1..8)

    # For each table that you want to capture changes of, you need to install the trigger.
    Carbonite.Migrations.create_trigger(:rabbits)

    # Optionally you may configure the trigger inside the migration.
    Carbonite.Migrations.put_trigger_config(:rabbits, :excluded_columns, ["age"])

    # Optional outbox to process transactions later.
    Carbonite.Migrations.create_outbox("rabbit_holes")
  end

  def down do
    # Remove outbox again.
    Carbonite.Migrations.drop_outbox("rabbit_holes")

    # Remove all triggers before dropping the schema.
    Carbonite.Migrations.drop_trigger(:rabbits)

    # Drop the Carbonite tables.
    Carbonite.Migrations.down(8..1)
  end
end

Updates

When a new Carbonite version is released, it may contain updates to the database schema. As these are announced in the Changelog, you each time need to create a migration in your host application like the following.

# priv/repo/migrations/20210704201534_update_carbonite.exs
defmodule MyApp.Repo.Migrations.UpdateCarbonite do
  use Ecto.Migration

  def up do
    Carbonite.Migrations.up(3)
  end

  def down do
    Carbonite.Migrations.down(3)
  end
end

Applying multiple Carbonite migrations in a single host migration is fine. Note that for each of your Carbonite "partitions" (see below), you need to run each Carbonite migration.

Trigger configuration

The behaviour of the capture trigger is customizable per table by manipulating the settings in the triggers table. Often you will want to update these settings within a migration, as well, which is why Carbonite provides the small helper function Carbonite.Migrations.put_trigger_config/4 that updates the settings using plain SQL statements.

Primary Key Columns

To speed up version lookups for a specific record, Carbonite copies its primary key(s) to the table_pk column of the changes table. The table keeps an index on this column together with the table prefix and name.

By default, Carbonite will try to copy the :id column of the source table. If your table does not have a primary key, has a primary key with a different name, or has a composite primary key, you can override this using the primary_key_columns option.

# Disable PK copying
Carbonite.Migrations.put_trigger_config(:rabbits, :primary_key_columns, [])

# Different name
Carbonite.Migrations.put_trigger_config(:rabbits, :primary_key_columns, ["identifier"])

# Composite PK
Carbonite.Migrations.put_trigger_config(:rabbits, :primary_key_columns, ["house", "apartment_no"])

Since the changes table keeps versions of a multitude of different source tables, primary keys are first cast to string (the table_pk column has type VARCHAR[]). For composite primary keys, set the primary_key_columns option to an array as shown above. Each component of a compound primary key will be cast to string before the components are joined into the array.

Excluded and Filtered Columns

In case your table contains sensitive data or data otherwise undesirable for change capturing, you can exclude columns using the excluded_columns option. Excluded columns will not appear in the captured data. If an UPDATE on a table solely touches excluded columns, the entire UPDATE will not be recorded.

Carbonite.Migrations.put_trigger_config(:rabbits, :excluded_columns, ["age"])

If you still want to capture changes to a column (in the changed field), but don't need the exact data, you can make it a "filtered" column. These columns appear as [FILTERED] in the data field.

Carbonite.Migrations.put_trigger_config(:rabbits, :filtered_columns, ["age"])

Persisting replaced data on UPDATE statements

By default, the changes table will track the current version of a record in its data field and a list of changed fields in the changed column. Optionally, you can decide to store the replaced data (the "diff") in the changed_from field, by enabling the store_changed_from option for a trigger. This defaults to false.

Carbonite.Migrations.put_trigger_config(:rabbits, :store_changed_from, true)

Partitioning the Audit Trail

Carbonite can install its tables into multiple database schemas using the carbonite_prefix option. You can use this feature to "partition" your captured data.

Carbonite.Migrations.up(1, carbonite_prefix: "carbonite_animals")
Carbonite.Migrations.create_trigger(:rabbits, carbonite_prefix: "carbonite_animals")

Basically all of Carbonite's functions accept the same carbonite_prefix option to target a particular partition. If desired, tables can participate in multiple partitions by adding multiple triggers on to them. Keep in mind that each partition will need to be processed and purged separately, resulting in multiple streams of change data in your external storage.

Inserting a Transaction

In your application logic, before modifying an audited table like rabbits, you need to first create a Carbonite.Transaction record.

With Ecto.Multi

The easiest way to do so is using Carbonite.Multi.insert_transaction/2 within an Ecto.Multi operation:

Ecto.Multi.new()
|> Carbonite.Multi.insert_transaction(%{meta: %{type: "rabbit_inserted"}})
|> Ecto.Multi.insert(:rabbit, &MyApp.Rabbit.create_changeset(&1.params))
|> MyApp.Repo.transaction()

As you can see, the Carbonite.Transaction is a great place to store metadata for the operation. A type field can be used to categorize the transactions. A user_id would be a good candidate for a transaction log, as well.

Without Ecto.Multi

If you have a situation where you want to insert a transaction without Ecto.Multi, you can use Carbonite.insert_transaction/3.

MyApp.Repo.transaction(fn ->
  {:ok, _} = Carbonite.insert_transaction(MyApp.Repo, %{meta: %{type: "rabbit_inserted"}})

  params
  |> MyApp.Rabbit.create_changeset()
  |> MyApp.Repo.insert!
end)

Building a changeset for manual insertion

Another alternative is creating a changeset for a Carbonite.Transaction using Carbonite.Transaction.changeset/1:

MyApp.Repo.transaction(fn ->
  %{meta: %{type: "rabbit_inserted"}}
  |> Carbonite.Transaction.changeset()
  |> MyApp.Repo.insert!()

  # ...
end)

Setting metadata outside of the transaction

In case you do not have access to metadata you want to persist in the Carbonite.Transaction at the code site where you create it, you can use Carbonite.Transaction.put_meta/2 to store metadata in the process dictionary. This metadata is merged into the metadata given to Carbonite.Multi.insert_transaction/2.

# e.g., in a controller or plug
Carbonite.Transaction.put_meta(:user_id, ...)

Transactions in data migrations

If you manipulate data inside your transactions, as usual a Carbonite.Transaction needs to be inserted before any other statements. You can use Carbonite.Migrations.insert_migration_transaction/1 to insert a transaction with a meta attribute populated from the migration module.

import Carbonite.Migrations

def change do
  insert_migration_transaction()

  execute("UPDATE ...")
end

"Empty" transactions when no changes have been recorded

By default, Carbonite will store the Carbonite.Transaction record regardless of whether in the corresponding database transaction any changes were recorded or not. In fact, there is nothing special about the behaviour of the following code:

Ecto.Multi.new()
|> Carbonite.Multi.insert_transaction(%{meta: %{type: "rabbit_inserted"}})
|> Ecto.Multi.run(:rabbit, &maybe_insert_rabbit/2)
|> MyApp.Repo.transaction()

Depending on the behaviour of maybe_insert_rabbit/2, the transaction may result in one of these outcomes:

  1. The transaction succeeds and the rabbit is inserted. A Carbonite.Transaction is recorded alongside a Carbonite.Change.
  2. The transaction succeeds but no rabbit is inserted. The Carbonite.Transaction is recorded but will be empty, that is, not associated to any Carbonite.Change records.
  3. The transaction fails and is rolled back. The Carbonite.Transaction is not persisted.

Outcome (1) and (3) above are engrained in Carbonite's trigger logic and can not be changed.

The second outcome can be considered "intended behaviour" and there are good reasons for keeping the Carbonite.Transaction around. However, if your use-case indicates that you should not store Carbonite.Transactions when nothing was changed, you need to:

  1. Set the trigger to INITIALLY DEFERRED when it is created.
  2. Re-order your application logic.
  3. Only insert the transaction when needed.
# In the migration where the trigger is created
Carbonite.Migrations.create_trigger(:rabbits, initially: :deferred)
Ecto.Multi.new()
|> Ecto.Multi.run(:rabbit, &maybe_insert_rabbit/2)
|> Ecto.Multi.run(:carbonite_transaction, &maybe_insert_carbonite_transaction/2)
|> MyApp.Repo.transaction()

See Carbonite.Migrations.create_trigger/2 for further information.

Retrieving data

Of course, persisting the audit trail is not an end in itself. At some point you will want to read the data back and make it accessible to the user. Carbonite.Query offers a small suite of helper functions that make it easier to query the database for Transaction and Change records.

Fetching transactions

The Carbonite.Query.transactions/1 function constructs an Ecto.Query for loading Carbonite.Transaction.t/0 records from the database, optionally preloading their included changes. The query can be further refined to limit the result set.

Carbonite.Query.transactions()
|> Ecto.Query.where([t], t.inserted_at > ^earliest)
|> MyApp.Repo.all()

Fetching changes of individual records

The Carbonite.Query.changes/2 function constructs an Ecto.Query.t/0 from a schema struct, loading all changes stored for the given source record.

%MyApp.Rabbit{id: 1}
|> Carbonite.Query.changes()
|> MyApp.Repo.all()

Processing data

Storing event series in a relational database comes with the usual scaling issues, so at some point you may find it necessary to offload the captured data to an external data store. For processing, exporting, and later purging the captured transactions, Carbonite includes what is called an "Event Outbox", or in fact as many of those as you need.

Essentially, a Carbonite.Outbox is a named cursor on the ordered list of transactions stored in an audit trail. Each outbox advances by calling a user-supplied processor function on a batch of transactions. The batch can be filtered and limited in size, but is always ordered ascending by the transaction's id attribute, so transactions are processed roughly in order of their insertion (see below for a discussion on caveats of this solution). When a batch is processed, the outbox remembers its last position - the last transaction that has been processed - so the following call to the outbox can continue where the last has left of. Besides, the outbox persists a user-definable "memo", an arbitrary map that allows to feed output of the previous processor run into the next one.

Creating an Outbox

Create an outbox within a migration:

Carbonite.Migrations.create_outbox("rabbit_holes")

You can create more than one outbox if you need multiple processors, e.g. for multiple target systems for your data.

Processing

You can process an outbox by calling Carbonite.process/4 with the outbox name and a processor callback function.

Carbonite.process(MyApp.Repo, "rabbit_holes", fn transactions, _memo ->
  for transaction <- transactions do
    send_to_external_database(transaction)
  end

  :cont
end)

In practise, you will almost always want to run this within an asynchronous job processor. The following shows an exemplary Oban worker:

# config/config.exs
config :my_app, :oban,
  repo: MyApp.Repo,
  plugins: [
    {Oban.Plugins.Cron, crontab: [
      {"0 0 * * *", MyApp.PeriodicOutboxWorker, args: %{outbox: "rabbit_holes"}}
    ]
  ]

# lib/my_app/periodic_outbox_worker.ex
defmodule MyApp.PeriodicOutboxWorker do
  # Worker is scheduled every 24h, let's make it unique for 12h to be sure.
  # Retry is disabled, the next scheduled run will pick up failed outboxes.
  use Oban.Worker, queue: :default, unique: [period: 43_200], retry: false

  def perform(%Oban.Job{args: %{"outbox" => outbox}}) do
    Carbonite.process(MyApp.Repo, outbox, fn transactions, _memo ->
      for transaction <- transactions do
        send_to_external_database(transaction)
      end

      :cont
    end)
  end
end

⚠️ Transactionality & mutual exclusion

As the outbox processor is likely to call external services as part of its work, Carbonite.process/4 does not begin a transaction itself, and consequently does not acquire a lock on the outbox record. In other words, users have to ensure that they don't accidentally run Carbonite.process/4 for the same outbox concurrently, for instance making use of uniqueness options of the used job processor (e.g. unique in Oban).

The absence of transactionality also means that any exception raised within the processor function will immediately abort the Carbonite.process/4 call without writing the current outbox position to disk. As a result, transactions may be processed again in the next run. Please make sure your receiving system or database can handle duplicate messages.

⚠️ Long running / parallel transactions and the outbox order

When a Carbonite.Transaction record is created at the beginning of an operation in your application, it records the "current" sequence value in its id field. The record in the transactions table becomes visible to other transactions when the current transaction is committed.

A few observations can be made from this:

  • When ordered by their id field, the transactions will be roughly sorted by the time the corresponding operation was started, not when it was committed.
  • Two transactions running in parallel may be committed "out of order", i.e. one with the larger id may be committed before the smaller id transaction if that has a longer runtime.

The latter point is crucial: For instance, if two transactions with id=1 and id=2 run in parallel and id=2 finishes before id=1, an outside viewer can already see id=2 in the database before id=1 is committed. If this outside viewer happens to be an outbox processing job, transaction with id=1 might be skipped and never looked at again. To mitigate this issue, Carbonite.process/4 has a min_age option which excludes transaction younger than a certain from the processing batch (5 minutes by default, increase this is you expect longer running transactions).

Purging

Now that you have successfully exported your audit data, you may delete old transactions from the primary database. Carbonite.purge/2 deletes records that have been processed by all existing outboxes.

# Deletes records whose id field is less than the transaction_id on each outbox.
Carbonite.purge(MyApp.Repo)

Testing / Bypassing Carbonite

One of Carbonite's key features is that it is virtually impossible to forget to record a change to a table (due to the trigger) or to forget to insert an enclosing Carbonite.Transaction beforehand (due to the foreign key constraint between changes and transactions). However, in some circumstances it may be desirable to temporarily switch off change capturing. One such situation is the use of factories (e.g. ExMachina) inside your test suite: Inserting a transaction before each factory call quickly becomes cumbersome and will unnecessarily increase execution time. Additionally, the use of Ecto's SQL sandbox means that your factory calls are contained in the same transaction as your domain logic, which means you cannot simply insert a Carbonite.Transaction before your factories if you intend to assert on the one inserted by your domain logic.

To bypass the capture trigger, Carbonite's trigger configuration provides a toggle mechanism consisting of two fields: mode and override_xact_id. The former you set while installing the trigger on a table in a migration, while the latter allows to "override" whatever has been set, at runtime and only for the current transaction. If you are using Ecto's SQL sandbox for running transactional tests, this means the override is going to be active until the end of the test case.

As a result, you have two options:

  1. Leave the mode at the default value of :capture and turn off capturing as needed by switching to "override mode". This means for every test case / test setup block where you wish to bypass change capturing, you explicitly disable the trigger before any database calls. This approach has the benefit that you still capture all changes by default, and can't miss to test a code path that (in production) would require a Carbonite.Transaction. It is, however, pretty expensive at ~1 additional SQL call per test case.
  2. Set the mode to :ignore on all triggers in your :test environment and instead selectively turn on capturing in test cases where you want to assert on the captured data. For instance, you can set the trigger mode in your migration based on the Mix environment. This approach is cheaper as it does not require any action in your tests by default. Yet you should make sure that you test all code paths that do mutate change-captured tables, in order to assert that each of these inserts a transaction as well.

Option 1: Bypassing change capture selectively

As an example for option 1, consider the following:

# test/support/carbonite_helpers.ex
defmodule MyApp.CarboniteHelpers do
  def with_ignored_carbonite(fun) do
    Carbonite.override_mode(MyApp.Repo, to: :ignore)
    result = fun.()
    Carbonite.override_mode(MyApp.Repo, to: :capture)
    result
  end
end

# test/some_test.exs
test "my_operation/0" do
  rabbit = with_ignored_carbonite(fn -> insert_rabbit() end)

  my_operation(rabbit)

  # You can assert on the Carbonite.Transaction, e.g. like this:
  meta =
    Carbonite.Query.current_transaction()
    |> MyApp.Repo.one!()
    |> Map.fetch!(:meta)

  assert meta = %{"type" => "some_operation"}
end

test "my_other_operation/0" do
  rabbit = insert_rabbit()

  # this will raise if you forgot to insert a transaction in my_other_operation/0
  my_operation()
end

Option 2: Bypassing change capture by default

The following code snippet illustrates the second approach:

# config/config.exs
config :my_app, carbonite_mode: :capture

# config/test.exs
config :my_app, carbonite_mode: :ignore

# priv/repo/migrations/000000000000_install_carbonite.exs
defmodule MyApp.Repo.Migrations.InstallCarbonite do
  @mode Application.compile_env!(:my_app, :carbonite_mode)

  def up do
    # ...
    Carbonite.Migrations.create_trigger(:rabbits)
    Carbonite.Migrations.put_trigger_config(:rabbits, :mode, @mode)
  end
end

# test/support/carbonite_helpers.ex
defmodule MyApp.CarboniteHelpers do
  def enable_carbonite do
    Carbonite.override_mode(MyApp.Repo, to: :capture)
  end
end

# test/some_test.exs
test "my_operation/0" do
  rabbit = insert_rabbit()

  enable_carbonite()

  my_operation()

  # Assert on the Carbonite.Transaction...
end

test "my_other_operation/0" do
  rabbit = insert_rabbit()

  # this will pass regardless of whether you inserted a transaction in my_other_operation/0 (!)
  my_operation()
end

Summary

Functions

Returns the default audit trail prefix.

Fetches all changes of the current transaction from the database.

Sets the current transaction to "override mode" for all tables in the audit log.

Deletes transactions that have been fully processed.

Types

Link to this type

prefix()

View Source (since 0.1.0)
@type prefix() :: binary()
Link to this type

prefix_option()

View Source (since 0.1.0)
@type prefix_option() :: {:carbonite_prefix, prefix()}
Link to this type

process_func()

View Source (since 0.1.0)
@type process_func() ::
  ([Carbonite.Transaction.t()], Carbonite.Outbox.memo() ->
     :cont | :halt | {:cont | :halt, [process_func_option()]})

This type defines the callback function signature for Carbonite.process/3.

The processor function receives the current chunk of transactions and the memo of the last function application, and must return one of

  • :cont - continue processing
  • :halt - stop processing after this chunk
  • {:cont | :halt, opts} - cont/halt and set some options

After the process function invocation the Outbox is updated with new attributes.

Options

Returned options can be:

  • memo - memo to store on Outbox, defaults to previous memo
  • last_transaction_id - last transaction id to remember as processed, defaults to previous
                        `last_transaction_id` on `:halt`, defaults to last id in current
                        chunk when `:cont` is returned
Link to this type

process_func_option()

View Source (since 0.1.0)
@type process_func_option() ::
  {:memo, Carbonite.Outbox.memo()} | {:discard_last, boolean()}
Link to this type

process_option()

View Source (since 0.1.0)
@type process_option() ::
  Carbonite.Query.outbox_queue_option()
  | {:filter, (Ecto.Query.t() -> Ecto.Query.t())}
  | {:chunk, pos_integer()}
Link to this type

purge_option()

View Source (since 0.1.0)
@type purge_option() :: Carbonite.Query.outbox_done_option()
@type repo() :: Ecto.Repo.t()

Functions

Link to this function

default_prefix()

View Source (since 0.1.0)
@spec default_prefix() :: prefix()

Returns the default audit trail prefix.

Link to this function

fetch_changes(repo, opts \\ [])

View Source (since 0.5.0)
@spec fetch_changes(repo(), [prefix_option()]) :: {:ok, [Carbonite.Change.t()]}

Fetches all changes of the current transaction from the database.

Make sure to run this within a transaction.

Parameters

  • repo - the Ecto repository
  • opts - optional keyword list

Options

  • carbonite_prefix - defines the audit trail's schema, defaults to "carbonite_default"
Link to this function

insert_transaction(repo, params \\ %{}, opts \\ [])

View Source (since 0.4.0)
@spec insert_transaction(repo(), params :: map(), [prefix_option()]) ::
  {:ok, Carbonite.Transaction.t()} | {:error, Ecto.Changeset.t()}

Inserts a Carbonite.Transaction.t/0 into the database.

Make sure to run this within a transaction.

Parameters

  • repo - the Ecto repository
  • params - map of params for the Carbonite.Transaction (e.g., :meta)
  • opts - optional keyword list

Options

  • carbonite_prefix - defines the audit trail's schema, defaults to "carbonite_default"

Multiple inserts in the same transaction

Normally, you should have exactly one insert_transaction/3 call per database transaction. In practise, there are two scenarios in this function may be called multiple times:

  1. If an operation A, which calls insert_transaction/3, sometimes is nested within an outer operation B, which also calls insert_transaction/3.
  2. In tests using Ecto's SQL sandbox, subsequent calls to transactional operations (even to the same operation twice) are wrapped inside the overarching test transaction, and hence also effectively call insert_transaction/3 within the same transaction.

While the first scenario can be resolved using appropriate control flow (e.g. by conditionally disabling the inner insert_transaction/3 call), the second scenario is quite common and often unavoidable.

Therefore, insert_transaction/3 ignores subsequent calls within the same database transaction (equivalent to ON CONFLICT DO NOTHING), discarding metadata passed to all calls but the first.

Link to this function

override_mode(repo, opts \\ [])

View Source (since 0.4.0)
@spec override_mode(repo(), [{:to, Carbonite.Trigger.mode()} | prefix_option()]) ::
  :ok

Sets the current transaction to "override mode" for all tables in the audit log.

Parameters

  • repo - the Ecto repository
  • opts - optional keyword list

Options

  • to - allows to specify the target mode, useful to reset the mode after use
  • carbonite_prefix - defines the audit trail's schema, defaults to "carbonite_default"
Link to this function

process(repo, outbox_name, opts \\ [], process_func)

View Source (since 0.4.0)
@spec process(repo(), Carbonite.Outbox.name(), [process_option()], process_func()) ::
  {:ok | :halt, Carbonite.Outbox.t()}

Processes an outbox queue.

This function sends chunks of persisted transactions to a user-supplied processing function. It looks up the current "reading position" from a given Carbonite.Outbox and yields transactions matching the given filter criteria (min_age, etc.) until either the input source is exhausted or a processing function application returns :halt.

Returns either {:ok, outbox} or {:halt, outbox} depending on whether processing was halted explicitly or due the exhausted input source.

See Carbonite.Query.outbox_queue/2 for query options.

Examples

Carbonite.process(MyApp.Repo, "rabbit_holes", fn [transaction], _memo ->
  # The transaction has its changes preloaded.
  transaction
  |> MyApp.Foo.serialize()
  |> MyApp.Foo.send_to_external_database()

  :cont
end)

Memo passing

The memo is useful to carry data between each processor application. Let's say you wanted to generate a hashsum chain on your processed data:

Carbonite.process(MyApp.Repo, "rabbit_holes", fn [transaction], %{"checksum" => checksum} ->
  {payload, checksum} = MyApp.Foo.serialize_and_hash(transaction, checksum)

  MyApp.Foo.send_to_external_database(payload)

  {:cont, memo: %{"checksum" => checksum}}
end)

Chunking / Limiting

The examples above received a single-element list as their first parameter: This is because the transactions are actually processed in "chunks" and the default chunk size is 1. If you would like to process more transactions in one chunk, set the chunk option:

Carbonite.process(MyApp.Repo, "rabbit_holes", [chunk: 50], fn transactions, _memo ->
  for transaction <- transactions do
    transaction
    |> MyApp.Foo.serialize()
    |> MyApp.Foo.send_to_external_database()
  end

  :cont
end)

The query that is executed to fetch the data from the database is controlled with the limit option and is independent of the chunk size.

Error handling

In case you run into an error midway into processing a batch, you may choose to halt processing while remembering about the last processed transaction. This is equivalent to raising an exception from the processing function.

Carbonite.process(MyApp.Repo, "rabbit_holes", fn [transaction], _memo ->
  case send_to_external_database(transaction) do
    :ok ->
      :cont

    {:error, _term} ->
      :halt
  end
end

You can, however, if you know the first half of a batch has been processed, still update the memo and last_transaction_id.

Carbonite.process(MyApp.Repo, "rabbit_holes", fn transactions, _memo ->
  case process_transactions(transactions) do
    {:error, last_successful_transaction} ->
      {:halt, last_transaction_id: last_successful_transaction.id}

    :ok ->
      :cont
  end
end

Parameters

  • repo - the Ecto repository
  • outbox_name - name of the outbox to process
  • opts - optional keyword list
  • process_func - see process_func/0 for details

Options

  • min_age - the minimum age of a record, defaults to 300 seconds (set nil to disable)
  • limit - limits the query in size, defaults to 100 (set nil to disable)
  • filter - function for refining the batch query, defaults to nil
  • chunk - defines the size of the chunk passed to the process function, defaults to 1
  • carbonite_prefix - defines the audit trail's schema, defaults to "carbonite_default"
Link to this function

purge(repo, opts \\ [])

View Source (since 0.4.0)
@spec purge(repo(), [purge_option()]) :: {:ok, non_neg_integer()}

Deletes transactions that have been fully processed.

See Carbonite.Query.outbox_done/1 for query options.

Returns the number of deleted transactions.

Parameters

  • repo - the Ecto repository
  • opts - optional keyword list

Options

  • min_age - the minimum age of a record, defaults to 300 seconds (set nil to disable)
  • carbonite_prefix - defines the audit trail's schema, defaults to "carbonite_default"