Nebulex.Adapters.Replicated (Nebulex.Distributed v3.2.1)

Copy Markdown View Source

Adapter module for the replicated cache topology using push-based replication.

Features

  • Replicated cache topology with eager push-based replication.
  • Zero-latency local reads — all data is replicated on every node.
  • Writes are applied locally and replicated to all peers via buffered RPC.
  • Double-buffered outbox and inbox for high-throughput batched replication.
  • "Newer version wins" conflict resolution via wall-clock versioning.
  • Optional anti-entropy reconciliation to detect and repair data drift.
  • Configurable primary storage adapter.

Replicated Cache Topology

The replicated adapter provides an "eager push replication" pattern. Each node maintains its own local cache. Writes are applied locally first, then batched and pushed to all peer nodes via RPC. On the receiving side, an inbox buffer applies remote commands to the local primary cache using "newer version wins" semantics.

Key characteristics:

  • Local Storage: Each node has a local cache. All read operations are served directly from the local cache with no network overhead.

  • Push-Based Replication: When a cache entry is modified, the change is buffered in an outbox and periodically pushed to all peer nodes in a single batched RPC call.

  • Conflict Resolution: Uses wall-clock versioning (System.system_time()) with "newer version wins" semantics. Timestamps are comparable across nodes (assuming NTP sync), so concurrent writes to the same key are resolved correctly regardless of which node originated the write.

  • Double-Buffered I/O: Both outbox (sending) and inbox (receiving) are backed by PartitionedBuffer.Map, which provides double-buffered ETS tables for zero-downtime processing — writes continue while the previous batch is being processed. See PartitionedBuffer.Map for more details on the buffering mechanism.

How It Works

        Node A                      Node B                        Node C
  ┌───────────────┐           ┌───────────────┐             ┌───────────────┐
  │  Local Cache  │           │  Local Cache  │             │  Local Cache  │
  │   (primary)   │           │   (primary)   │             │   (primary)   │
  └──┬─────────┬──┘           └──┬─────────┬──┘             └──┬─────────┬──┘
     │         │                 │         │                   │         │
     ▼         ▼                 ▼         ▼                   ▼         ▼
┌────────┐ ┌────────┐       ┌────────┐ ┌────────┐        ┌────────┐ ┌────────┐
│ Inbox  │ │ Outbox │       │ Inbox  │ │ Outbox │        │ Inbox  │ │ Outbox │
└────────┘ └───┬────┘       └───▲────┘ └────────┘        └───▲────┘ └────────┘
               │                │                            │
               │ replicate ->   │                            │
               └────────────────┘────────────────────────────┘
                Batched RPC from Node A Outbox to peer Inboxes

Example: put on Node A

  Client ── put("k", "v") ──▶ Node A Local Cache (write locally)
                                   │
                                   ├──▶ Inbox (tagged :local, skip on process)
                                   └──▶ Outbox (buffered)
                                             │
                                        flush cycle
                                             │
                            ┌────────────────┼────────────────┐
                            ▼                                 ▼
                       Node B Inbox                      Node C Inbox
                      (tagged :remote)                  (tagged :remote)
                            │                                 │
                       process cycle                     process cycle
                            │                                 │
                            ▼                                 ▼
                      Node B Cache                       Node C Cache
                     put("k", "v")                      put("k", "v")

Write flow

  1. Node A modifies a cache entry (e.g., Cache.put("key", value)).
  2. The value is written to the local primary cache immediately.
  3. The command is written to the inbox (tagged :local, for conflict resolution) and to the outbox.
  4. On the next outbox flush cycle, all buffered commands are sent to peer inbox buffers via a single RPC.multicall with put_all_newer.
  5. On each peer, the inbox applies remote commands to the local primary cache (skipping :local entries).

Read flow

  1. Node B reads "key" from its local cache.
  2. If hit → return immediately (zero latency).
  3. If miss → return cache miss (data hasn't been replicated yet or was evicted locally).

Node join (push-based bootstrap)

When a new node joins the cluster, existing nodes push data to it rather than the new node pulling from a peer. This avoids the overwrite problem inherent in pull-based bootstrap, where the bootstrapping node's timestamp would be newer than any prior write version, potentially overwriting more recent data.

  1. New node joins the :pg group.
  2. All existing ClusterMonitor processes receive the :join event via :pg.monitor_scope/1.
  3. A simple leader election (smallest node name by Erlang term ordering) ensures exactly one existing node pushes data, avoiding duplicate work.
  4. The leader streams its local cache entries to the new node via RPC, using :put_new commands — entries are written only if the key does not already exist on the new node, preserving any data it received via normal replication in the meantime.
  5. If the primary storage adapter is Nebulex.Adapters.Local, the new node resets the GC interval on all cluster nodes to synchronize generation rotation and prevent premature eviction.
  6. After bootstrap, new writes propagate automatically via the normal replication flow. Anti-entropy reconciliation (if enabled) repairs any entries missed during bootstrap.

When to Use

The replicated adapter is ideal for:

  • Read-Heavy Workloads: Maximum read performance since all reads are served locally.
  • Small to Medium Datasets: Data that fits in memory on every node.
  • Low-Latency Write Propagation: Writes are batched and pushed eagerly, minimizing the consistency window.
  • When Eventual Consistency Is Acceptable: There is a small window between a write and its replication to peers.

Primary Storage Adapter

This adapter depends on a local cache adapter (primary storage), adding a push-based replication layer on top of it. You don't need to manually define the primary storage cache; the adapter initializes it automatically as part of the supervision tree.

The :primary_storage_adapter option (defaults to Nebulex.Adapters.Local) configures which adapter to use for the local storage. Options for the primary adapter can be specified via the :primary configuration option.

Usage

The cache expects the :otp_app and :adapter as options when used. The :otp_app should point to an OTP application with the cache configuration. Optionally, you can configure the desired primary storage adapter with the option :primary_storage_adapter (defaults to Nebulex.Adapters.Local). See the compile time options for more information:

  • :primary_storage_adapter (atom/0) - The adapter module used for the primary (local) storage on each cluster node. The replicated adapter wraps this local adapter and adds push-based replication on top of it. This option allows you to choose which adapter to use for the local storage. The configuration for the primary adapter is specified via the :primary start option. The default value is Nebulex.Adapters.Local.

For example:

defmodule MyApp.ReplicatedCache do
  use Nebulex.Cache,
    otp_app: :my_app,
    adapter: Nebulex.Adapters.Replicated
end

Providing a custom :primary_storage_adapter:

defmodule MyApp.ReplicatedCache do
  use Nebulex.Cache,
    otp_app: :my_app,
    adapter: Nebulex.Adapters.Replicated,
    adapter_opts: [primary_storage_adapter: Nebulex.Adapters.Local]
end

Configuration in config/config.exs:

config :my_app, MyApp.ReplicatedCache,
  primary: [
    gc_interval: :timer.hours(12),
    max_size: 1_000_000
  ],
  replication: [
    interval: :timer.seconds(1),
    batch_size: 1_000
  ]

Add the cache to your supervision tree:

def start(_type, _args) do
  children = [
    {MyApp.ReplicatedCache, []},
    ...
  ]

  opts = [strategy: :one_for_one, name: MyApp.Supervisor]
  Supervisor.start_link(children, opts)
end

See Nebulex.Cache for more information.

Configuration Options

This adapter supports the following configuration options:

  • :primary (keyword/0) - Configuration options passed to the primary storage adapter specified via :primary_storage_adapter. The available options depend on which adapter you choose. Refer to the documentation of your chosen primary storage adapter for the complete list of supported options. The default value is [].

  • :replication (keyword/0) - Configuration options for the push-based replication layer. Controls how often buffered commands are flushed, batch sizes, RPC timeouts, and retry behavior. The default value is [].

    • :interval (pos_integer/0) - How often (in milliseconds) the outbox and inbox buffers swap tables and run the processing cycle. Lower values mean faster replication but more frequent task spawning. Maps to the :processing_interval_ms option of PartitionedBuffer.Map. The default value is 1000.

    • :batch_size (pos_integer/0) - Number of entries to read from ETS per batch when flushing the outbox and inbox buffers. The processor is called once per batch. Maps to the :processing_batch_size option of PartitionedBuffer.Map. The default value is 1000.

    • :timeout (pos_integer/0) - Timeout in milliseconds for the RPC multicall when replicating buffered commands to peer nodes. The default value is 60000.

    • :retries (non_neg_integer/0) - Number of times to retry replicating to a failed peer node before giving up. The default value is 3.

    • :retry_delay (pos_integer/0) - Delay in milliseconds between replication retry attempts. The default value is 100.

    • :partitions (pos_integer/0) - Number of partitions for the inbox and outbox buffers. More partitions reduce contention under high write concurrency. Maps to the :partitions option of PartitionedBuffer.Map. Defaults to System.schedulers_online().

    • :anti_entropy_interval (pos_integer/0) - Interval in milliseconds between anti-entropy reconciliation cycles. When set, a background process periodically picks a random peer, compares bucket-hashed Merkle digests of the local and remote caches, and repairs only the divergent keys by writing them through the inbox (preserving "newer version wins" semantics).

      Disabled by default (not present). Set to a positive integer to enable, e.g., :timer.minutes(1).

Extended API

This adapter provides some additional convenience functions to the Nebulex.Cache API.

Retrieving the primary storage or local cache module:

MyCache.__primary__()

Retrieving the cluster nodes associated with the given cache name:

MyCache.nodes()

Joining the cache to the cluster:

MyCache.join_cluster()

Leaving the cluster (removes the cache from the cluster):

MyCache.leave_cluster()

Telemetry events

Since the replicated adapter depends on the configured primary storage cache (which uses a local cache adapter), this one will also emit Telemetry events. Therefore, there will be events emitted by the replicated adapter as well as the primary storage cache. For example, the cache defined before MyApp.ReplicatedCache will emit the following events:

  • [:my_app, :replicated_cache, :command, :start]
  • [:my_app, :replicated_cache, :primary, :command, :start]
  • [:my_app, :replicated_cache, :command, :stop]
  • [:my_app, :replicated_cache, :primary, :command, :stop]
  • [:my_app, :replicated_cache, :command, :exception]
  • [:my_app, :replicated_cache, :primary, :command, :exception]

As you may notice, the telemetry prefix by default for the cache is [:my_app, :replicated_cache]. However, you could specify the :telemetry_prefix for the primary storage within the :primary options (if you want to override the default). See the Telemetry guide for more information and examples.

Adapter-specific telemetry events

The replication process emits the following Telemetry span events when flushing buffered commands to peer nodes:

  • telemetry_prefix ++ [:replication, :start] - Dispatched when a replication batch starts being sent to peer nodes.

    • Measurements: %{system_time: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        peers: [atom]
      }
  • telemetry_prefix ++ [:replication, :stop] - Dispatched when a replication batch completes (successfully or with errors).

    • Measurements: %{duration: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        peers: [atom],
        errors: [{term, atom}]
      }
  • telemetry_prefix ++ [:replication, :exception] - Dispatched when a replication batch raises an exception.

    • Measurements: %{duration: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        peers: [atom],
        kind: :error | :exit | :throw,
        reason: term(),
        stacktrace: [term()]
      }

The :errors field in the :stop metadata is a list of {error, node} tuples for each peer node that failed to receive the replication batch. An empty list indicates all peers were updated successfully. When errors occur, the replicator retries failed nodes up to :retries times with a :retry_delay between attempts (see :replication options).

Bootstrap events

When a new node joins the cluster and an existing node pushes data to it (push-based bootstrap), the following Telemetry span events are emitted on the pushing node:

  • telemetry_prefix ++ [:bootstrap, :start] - Dispatched when an existing node starts pushing entries to a newly joined node.

    • Measurements: %{system_time: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        peer: atom
      }
  • telemetry_prefix ++ [:bootstrap, :stop] - Dispatched when the bootstrap push completes successfully.

    • Measurements: %{duration: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        peer: atom,
        total: non_neg_integer
      }
  • telemetry_prefix ++ [:bootstrap, :exception] - Dispatched when the bootstrap push raises an exception.

    • Measurements: %{duration: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        peer: atom,
        kind: :error | :exit | :throw,
        reason: term(),
        stacktrace: [term()]
      }

Anti-entropy events

When anti-entropy reconciliation is enabled (:anti_entropy_interval), the following Telemetry span events are emitted each cycle:

  • telemetry_prefix ++ [:anti_entropy, :start] - Dispatched when an anti-entropy cycle starts.

    • Measurements: %{system_time: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        peer: atom
      }
  • telemetry_prefix ++ [:anti_entropy, :stop] - Dispatched when an anti-entropy cycle completes.

    • Measurements: %{duration: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        peer: atom,
        repaired: non_neg_integer,
        divergent_buckets: non_neg_integer
      }
  • telemetry_prefix ++ [:anti_entropy, :exception] - Dispatched when an anti-entropy cycle raises an exception (e.g., RPC failure to the selected peer).

    • Measurements: %{duration: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        peer: atom,
        kind: :error | :exit | :throw,
        reason: term(),
        stacktrace: [term()]
      }

Anti-Entropy Reconciliation

The replicated adapter supports optional anti-entropy reconciliation to detect and repair data drift between nodes. This can happen after missed replication batches (e.g., brief network partitions or node outages).

When enabled via :anti_entropy_interval, a background process runs periodically on each node:

  1. Picks a random peer.
  2. Builds a bucket-hashed digest (1024 fixed buckets, XOR of key/value hashes) of the local cache.
  3. Fetches the peer's digest via RPC.
  4. Compares digests to find divergent buckets.
  5. For divergent buckets, fetches the peer's actual entries (with TTLs).
  6. Writes them through the inbox, preserving "newer version wins" conflict resolution.

This approach is based on the anti-entropy reconciliation technique originally described in the Amazon Dynamo paper (DeCandia et al., 2007) and widely adopted by distributed databases like Apache Cassandra and Riak. The specific implementation follows Riak's Active Anti-Entropy (AAE) design most closely: instead of building a full Merkle tree over individual keys (expensive to build and compare), keys are hashed into a fixed number of buckets and each bucket stores the XOR of its key/value hashes. This bucket-based approach provides precise divergence detection with minimal overhead — only the entries in divergent buckets need to be fetched and compared.

Configuration

config :my_app, MyApp.ReplicatedCache,
  replication: [
    interval: :timer.seconds(1),
    anti_entropy_interval: :timer.minutes(1)
  ]

Omit :anti_entropy_interval to disable (default).

Caveats

  • Replication Latency: There is a window (up to the :interval replication option) between when a write occurs on one node and when it is replicated to peers. During this window, peers may serve stale data.

  • Memory Usage: Every node holds a full copy of the cache. This topology is best suited for datasets that fit in memory on all nodes.

  • Queryable Operations: General queries (get_all, count_all, stream) operate on the local cache only. delete_all operates locally and replicates the deletion to all peer nodes.

Summary

Functions

Helper function to use dynamic cache for internal primary cache storage when needed.

Functions

with_dynamic_cache(adapter_meta, action, args)

Helper function to use dynamic cache for internal primary cache storage when needed.