Carbonite (Carbonite v0.1.0) View Source

Carbonite implements the Change-Data-Capture pattern on top of PostgreSQL using database triggers. It keeps a central changes table where all mutations of participating tables are recorded. Each changes row is associated to a single row in the transactions table using PostgreSQL's internal transaction id as the foreign key. This leads to the following interesting properties:

  • All changes created within a database transaction automatically and implicitly belong to the same record in the transactions table, even if they're created separatedly and agnostic of each other in the application logic. This gives the developer a "natural" way to group related changes into events (more on events later).
  • As the changes table is associated to the transactions table via a non-nullable foreign key constraint, the entry in the transactions table must be created before any changes. Attempting to modify a versioned table without prior insertion into the transactions table will result in an error. The transactions table carries transactional metadata which can be set by the developer on creation.

Consequently, much of Carbonite's logic lives in database functions and triggers. To get started, we need to create a migration using Ecto.

Migration

The following migration installs Carbonite into its "default prefix", a PostgreSQL schema aptly called carbonite_default, and installs the change capture trigger for an exemplary table called rabbits (in the public schema). In a real-world scenario, you will most likely want to install the trigger for a set of tables and optionally split the transaction log into multiple partitions.

See Carbonite.Migrations for more information on migrations.

mix ecto.gen.migration InstallCarbonite
# priv/repo/migrations/20210704201534_install_carbonite.exs

defmodule MyApp.Repo.Migrations.InstallCarbonite do
  use Ecto.Migration

  def up do
    # Creates carbonite_default schema and tables.
    Carbonite.Migrations.install_schema()

    # For each table that you want to capture changes of, you need to install the trigger.
    Carbonite.Migrations.install_trigger(:rabbits)
  end

  def down do
    # Remove all triggers before dropping the schema.
    Carbonite.Migrations.drop_trigger(:rabbits)

    # Drop the schema & tables.
    Carbonite.Migrations.drop_schema()
  end
end

Excluded Columns

In case your table contains sensitive data or data otherwise undesirable for change capturing, you can exclude columns using the excluded_columns option. Excluded columns will not appear in the captured data. If an UPDATE on a table solely touches excluded columns, the entire UPDATE will not be recorded.

Carbonite.Migrations.install_trigger(:rabbits, excluded_columns: ["age"])

If you forgot to exclude a column, you can reconfigure a trigger for a particular table using configure_trigger/2:

# in another migration
Carbonite.Migrations.configure_trigger(:rabbits, excluded_columns: ["age"])

Partitioning the Transaction Log

Carbonite can install its tables into multiple database schemas using the prefix option. You can use this feature to "partition" your captured data.

Carbonite.Migrations.install_schema(prefix: "carbonite_lagomorpha")
Carbonite.Migrations.install_trigger(:rabbits, carbonite_prefix: "carbonite_lagomorpha")

If desired, tables can participate in multiple partitions by adding multiple triggers on to them.

Keep in mind that each partition will need to be processed and purged separately, resulting in multiple streams of change data in your external storage.

Inserting a Transaction

In your application logic, before modifying a versioned table like rabbits, you need to first create a Carbonite.Transaction record.

With Ecto.Multi

The easiest way to do so is using Carbonite.insert/2 within an Ecto.Multi operation:

Ecto.Multi.new()
|> Carbonite.insert(meta: %{type: "rabbit_inserted"})
|> Ecto.Multi.insert(:rabbit, &MyApp.Rabbit.create_changeset(&1.params))
|> MyApp.Repo.transaction()

As you can see, the Carbonite.Transaction is a great place to store metadata for the operation. A "transaction type" would be an obvious choice to categorize the transactions. A user_id would be a good candidate for an transaction log, as well.

Building a changeset for manual insertion

If you don't have the luxury of an Ecto.Multi, you can create a changeset for a Carbonite.Transaction using Carbonite.transaction_changeset/1:

MyApp.Repo.transaction(fn ->
  %{meta: %{type: "rabbit_inserted"}}
  |> Carbonite.transaction_changeset()
  |> MyApp.Repo.insert!()

  # ...
end)

Setting metadata outside of the Transaction

In case you do not have access to metadata you want to persist in the Carbonite.Transaction at the code site where you create it, you can use Carbonite.put_meta/2 to store metadata in the process dictionary. This metadata is merged into the metadata given to Carbonite.insert/2.

# e.g., in a controller or plug
Carbonite.put_meta(:user_id, ...)

Link to this section Summary

Functions

Returns the currently stored metadata.

Adds an insert operation for a Carbonite.Transaction to an Ecto.Multi.

Stores a piece of metadata in the process dictionary.

Link to this section Types

Specs

build_option() :: {:meta, meta()}

Specs

insert_option() :: {:prefix, binary()} | build_option()

Specs

meta() :: map()

Link to this section Functions

Specs

current_meta() :: meta()

Returns the currently stored metadata.

Link to this function

insert(multi, opts \\ [])

View Source

Specs

Adds an insert operation for a Carbonite.Transaction to an Ecto.Multi.

Specs

put_meta(key :: any(), value :: any()) :: meta()

Stores a piece of metadata in the process dictionary.

This can be useful in situations where you want to record a value at a system boundary (say, the user's account_id) without having to pass it through to the database transaction.

Returns the currently stored metadata.

Link to this function

transaction_changeset(opts \\ [])

View Source

Specs

transaction_changeset([build_option()]) :: Ecto.Changeset.t()

Builds a changeset for a new Carbonite.Transaction.