Adapter module for the replicated cache topology using push-based replication.
Features
- Replicated cache topology with eager push-based replication.
- Zero-latency local reads — all data is replicated on every node.
- Writes are applied locally and replicated to all peers via buffered RPC.
- Double-buffered outbox and inbox for high-throughput batched replication.
- "Newer version wins" conflict resolution via wall-clock versioning.
- Optional anti-entropy reconciliation to detect and repair data drift.
- Configurable primary storage adapter.
Replicated Cache Topology
The replicated adapter provides an "eager push replication" pattern. Each node maintains its own local cache. Writes are applied locally first, then batched and pushed to all peer nodes via RPC. On the receiving side, an inbox buffer applies remote commands to the local primary cache using "newer version wins" semantics.
Key characteristics:
Local Storage: Each node has a local cache. All read operations are served directly from the local cache with no network overhead.
Push-Based Replication: When a cache entry is modified, the change is buffered in an outbox and periodically pushed to all peer nodes in a single batched RPC call.
Conflict Resolution: Uses wall-clock versioning (
System.system_time()) with "newer version wins" semantics. Timestamps are comparable across nodes (assuming NTP sync), so concurrent writes to the same key are resolved correctly regardless of which node originated the write.Double-Buffered I/O: Both outbox (sending) and inbox (receiving) are backed by
PartitionedBuffer.Map, which provides double-buffered ETS tables for zero-downtime processing — writes continue while the previous batch is being processed. SeePartitionedBuffer.Mapfor more details on the buffering mechanism.
How It Works
Node A Node B Node C
┌───────────────┐ ┌───────────────┐ ┌───────────────┐
│ Local Cache │ │ Local Cache │ │ Local Cache │
│ (primary) │ │ (primary) │ │ (primary) │
└──┬─────────┬──┘ └──┬─────────┬──┘ └──┬─────────┬──┘
│ │ │ │ │ │
▼ ▼ ▼ ▼ ▼ ▼
┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐ ┌────────┐
│ Inbox │ │ Outbox │ │ Inbox │ │ Outbox │ │ Inbox │ │ Outbox │
└────────┘ └───┬────┘ └───▲────┘ └────────┘ └───▲────┘ └────────┘
│ │ │
│ replicate -> │ │
└────────────────┘────────────────────────────┘
Batched RPC from Node A Outbox to peer Inboxes
Example: put on Node A
Client ── put("k", "v") ──▶ Node A Local Cache (write locally)
│
├──▶ Inbox (tagged :local, skip on process)
└──▶ Outbox (buffered)
│
flush cycle
│
┌────────────────┼────────────────┐
▼ ▼
Node B Inbox Node C Inbox
(tagged :remote) (tagged :remote)
│ │
process cycle process cycle
│ │
▼ ▼
Node B Cache Node C Cache
put("k", "v") put("k", "v")Write flow
- Node A modifies a cache entry (e.g.,
Cache.put("key", value)). - The value is written to the local primary cache immediately.
- The command is written to the inbox (tagged
:local, for conflict resolution) and to the outbox. - On the next outbox flush cycle, all buffered commands are sent to
peer inbox buffers via a single
RPC.multicallwithput_all_newer. - On each peer, the inbox applies remote commands to the local
primary cache (skipping
:localentries).
Read flow
- Node B reads "key" from its local cache.
- If hit → return immediately (zero latency).
- If miss → return cache miss (data hasn't been replicated yet or was evicted locally).
Node join (push-based bootstrap)
When a new node joins the cluster, existing nodes push data to it rather than the new node pulling from a peer. This avoids the overwrite problem inherent in pull-based bootstrap, where the bootstrapping node's timestamp would be newer than any prior write version, potentially overwriting more recent data.
- New node joins the
:pggroup. - All existing
ClusterMonitorprocesses receive the:joinevent via:pg.monitor_scope/1. - A simple leader election (smallest node name by Erlang term ordering) ensures exactly one existing node pushes data, avoiding duplicate work.
- The leader streams its local cache entries to the new node via
RPC, using
:put_newcommands — entries are written only if the key does not already exist on the new node, preserving any data it received via normal replication in the meantime. - If the primary storage adapter is
Nebulex.Adapters.Local, the new node resets the GC interval on all cluster nodes to synchronize generation rotation and prevent premature eviction. - After bootstrap, new writes propagate automatically via the normal replication flow. Anti-entropy reconciliation (if enabled) repairs any entries missed during bootstrap.
When to Use
The replicated adapter is ideal for:
- Read-Heavy Workloads: Maximum read performance since all reads are served locally.
- Small to Medium Datasets: Data that fits in memory on every node.
- Low-Latency Write Propagation: Writes are batched and pushed eagerly, minimizing the consistency window.
- When Eventual Consistency Is Acceptable: There is a small window between a write and its replication to peers.
Primary Storage Adapter
This adapter depends on a local cache adapter (primary storage), adding a push-based replication layer on top of it. You don't need to manually define the primary storage cache; the adapter initializes it automatically as part of the supervision tree.
The :primary_storage_adapter option (defaults to Nebulex.Adapters.Local)
configures which adapter to use for the local storage. Options for the
primary adapter can be specified via the :primary configuration option.
Usage
The cache expects the :otp_app and :adapter as options when used.
The :otp_app should point to an OTP application with the cache
configuration. Optionally, you can configure the desired primary storage
adapter with the option :primary_storage_adapter (defaults to
Nebulex.Adapters.Local). See the compile time options for more information:
:primary_storage_adapter(atom/0) - The adapter module used for the primary (local) storage on each cluster node. The replicated adapter wraps this local adapter and adds push-based replication on top of it. This option allows you to choose which adapter to use for the local storage. The configuration for the primary adapter is specified via the:primarystart option. The default value isNebulex.Adapters.Local.
For example:
defmodule MyApp.ReplicatedCache do
use Nebulex.Cache,
otp_app: :my_app,
adapter: Nebulex.Adapters.Replicated
endProviding a custom :primary_storage_adapter:
defmodule MyApp.ReplicatedCache do
use Nebulex.Cache,
otp_app: :my_app,
adapter: Nebulex.Adapters.Replicated,
adapter_opts: [primary_storage_adapter: Nebulex.Adapters.Local]
endConfiguration in config/config.exs:
config :my_app, MyApp.ReplicatedCache,
primary: [
gc_interval: :timer.hours(12),
max_size: 1_000_000
],
replication: [
interval: :timer.seconds(1),
batch_size: 1_000
]Add the cache to your supervision tree:
def start(_type, _args) do
children = [
{MyApp.ReplicatedCache, []},
...
]
opts = [strategy: :one_for_one, name: MyApp.Supervisor]
Supervisor.start_link(children, opts)
endSee Nebulex.Cache for more information.
Configuration Options
This adapter supports the following configuration options:
:primary(keyword/0) - Configuration options passed to the primary storage adapter specified via:primary_storage_adapter. The available options depend on which adapter you choose. Refer to the documentation of your chosen primary storage adapter for the complete list of supported options. The default value is[].:replication(keyword/0) - Configuration options for the push-based replication layer. Controls how often buffered commands are flushed, batch sizes, RPC timeouts, and retry behavior. The default value is[].:interval(pos_integer/0) - How often (in milliseconds) the outbox and inbox buffers swap tables and run the processing cycle. Lower values mean faster replication but more frequent task spawning. Maps to the:processing_interval_msoption ofPartitionedBuffer.Map. The default value is1000.:batch_size(pos_integer/0) - Number of entries to read from ETS per batch when flushing the outbox and inbox buffers. The processor is called once per batch. Maps to the:processing_batch_sizeoption ofPartitionedBuffer.Map. The default value is1000.:timeout(pos_integer/0) - Timeout in milliseconds for the RPC multicall when replicating buffered commands to peer nodes. The default value is60000.:retries(non_neg_integer/0) - Number of times to retry replicating to a failed peer node before giving up. The default value is3.:retry_delay(pos_integer/0) - Delay in milliseconds between replication retry attempts. The default value is100.:partitions(pos_integer/0) - Number of partitions for the inbox and outbox buffers. More partitions reduce contention under high write concurrency. Maps to the:partitionsoption ofPartitionedBuffer.Map. Defaults toSystem.schedulers_online().:anti_entropy_interval(pos_integer/0) - Interval in milliseconds between anti-entropy reconciliation cycles. When set, a background process periodically picks a random peer, compares bucket-hashed Merkle digests of the local and remote caches, and repairs only the divergent keys by writing them through the inbox (preserving "newer version wins" semantics).Disabled by default (not present). Set to a positive integer to enable, e.g.,
:timer.minutes(1).
Extended API
This adapter provides some additional convenience functions to the
Nebulex.Cache API.
Retrieving the primary storage or local cache module:
MyCache.__primary__()Retrieving the cluster nodes associated with the given cache name:
MyCache.nodes()Joining the cache to the cluster:
MyCache.join_cluster()Leaving the cluster (removes the cache from the cluster):
MyCache.leave_cluster()Telemetry events
Since the replicated adapter depends on the configured primary storage
cache (which uses a local cache adapter), this one will also emit Telemetry
events. Therefore, there will be events emitted by the replicated adapter
as well as the primary storage cache. For example, the cache defined before
MyApp.ReplicatedCache will emit the following events:
[:my_app, :replicated_cache, :command, :start][:my_app, :replicated_cache, :primary, :command, :start][:my_app, :replicated_cache, :command, :stop][:my_app, :replicated_cache, :primary, :command, :stop][:my_app, :replicated_cache, :command, :exception][:my_app, :replicated_cache, :primary, :command, :exception]
As you may notice, the telemetry prefix by default for the cache is
[:my_app, :replicated_cache]. However, you could specify the
:telemetry_prefix for the primary storage within the :primary options
(if you want to override the default). See the
Telemetry guide
for more information and examples.
Adapter-specific telemetry events
The replication process emits the following Telemetry span events when flushing buffered commands to peer nodes:
telemetry_prefix ++ [:replication, :start]- Dispatched when a replication batch starts being sent to peer nodes.Measurements:
%{system_time: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, peers: [atom] }
telemetry_prefix ++ [:replication, :stop]- Dispatched when a replication batch completes (successfully or with errors).Measurements:
%{duration: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, peers: [atom], errors: [{term, atom}] }
telemetry_prefix ++ [:replication, :exception]- Dispatched when a replication batch raises an exception.Measurements:
%{duration: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, peers: [atom], kind: :error | :exit | :throw, reason: term(), stacktrace: [term()] }
The :errors field in the :stop metadata is a list of {error, node}
tuples for each peer node that failed to receive the replication batch.
An empty list indicates all peers were updated successfully. When errors
occur, the replicator retries failed nodes up to :retries times with
a :retry_delay between attempts (see :replication options).
Bootstrap events
When a new node joins the cluster and an existing node pushes data to it (push-based bootstrap), the following Telemetry span events are emitted on the pushing node:
telemetry_prefix ++ [:bootstrap, :start]- Dispatched when an existing node starts pushing entries to a newly joined node.Measurements:
%{system_time: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, peer: atom }
telemetry_prefix ++ [:bootstrap, :stop]- Dispatched when the bootstrap push completes successfully.Measurements:
%{duration: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, peer: atom, total: non_neg_integer }
telemetry_prefix ++ [:bootstrap, :exception]- Dispatched when the bootstrap push raises an exception.Measurements:
%{duration: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, peer: atom, kind: :error | :exit | :throw, reason: term(), stacktrace: [term()] }
Anti-entropy events
When anti-entropy reconciliation is enabled (:anti_entropy_interval),
the following Telemetry span events are emitted each cycle:
telemetry_prefix ++ [:anti_entropy, :start]- Dispatched when an anti-entropy cycle starts.Measurements:
%{system_time: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, peer: atom }
telemetry_prefix ++ [:anti_entropy, :stop]- Dispatched when an anti-entropy cycle completes.Measurements:
%{duration: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, peer: atom, repaired: non_neg_integer, divergent_buckets: non_neg_integer }
telemetry_prefix ++ [:anti_entropy, :exception]- Dispatched when an anti-entropy cycle raises an exception (e.g., RPC failure to the selected peer).Measurements:
%{duration: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, peer: atom, kind: :error | :exit | :throw, reason: term(), stacktrace: [term()] }
Anti-Entropy Reconciliation
The replicated adapter supports optional anti-entropy reconciliation to detect and repair data drift between nodes. This can happen after missed replication batches (e.g., brief network partitions or node outages).
When enabled via :anti_entropy_interval, a background process runs
periodically on each node:
- Picks a random peer.
- Builds a bucket-hashed digest (1024 fixed buckets, XOR of key/value hashes) of the local cache.
- Fetches the peer's digest via RPC.
- Compares digests to find divergent buckets.
- For divergent buckets, fetches the peer's actual entries (with TTLs).
- Writes them through the inbox, preserving "newer version wins" conflict resolution.
This approach is based on the anti-entropy reconciliation technique originally described in the Amazon Dynamo paper (DeCandia et al., 2007) and widely adopted by distributed databases like Apache Cassandra and Riak. The specific implementation follows Riak's Active Anti-Entropy (AAE) design most closely: instead of building a full Merkle tree over individual keys (expensive to build and compare), keys are hashed into a fixed number of buckets and each bucket stores the XOR of its key/value hashes. This bucket-based approach provides precise divergence detection with minimal overhead — only the entries in divergent buckets need to be fetched and compared.
Configuration
config :my_app, MyApp.ReplicatedCache,
replication: [
interval: :timer.seconds(1),
anti_entropy_interval: :timer.minutes(1)
]Omit :anti_entropy_interval to disable (default).
Caveats
Replication Latency: There is a window (up to the
:intervalreplication option) between when a write occurs on one node and when it is replicated to peers. During this window, peers may serve stale data.Memory Usage: Every node holds a full copy of the cache. This topology is best suited for datasets that fit in memory on all nodes.
Queryable Operations: General queries (
get_all,count_all,stream) operate on the local cache only.delete_alloperates locally and replicates the deletion to all peer nodes.
Summary
Functions
Helper function to use dynamic cache for internal primary cache storage when needed.