Nebulex.Adapters.Partitioned (Nebulex.Distributed v3.0.0-rc.2)
View SourceAdapter module for the partitioned cache topology.
Features
- Partitioned cache topology (Sharding Distribution Model).
- Consistent hashing via
ExHashRingfor distributing keys across cluster nodes. - Automatic cluster membership management using Erlang's
:pg(process groups). - Support for transactions via Erlang global name registration facility.
- Configurable primary storage adapter.
Partitioned Cache Topology
There are several key points to consider about a partitioned cache:
Partitioned: The data in a distributed cache is spread out over all the servers in such a way that no two servers are responsible for the same piece of cached data. This means that the size of the cache and the processing power associated with the management of the cache can grow linearly with the size of the cluster. Also, it means that operations against data in the cache can be accomplished with a "single hop," in other words, involving at most one other server.
Load-Balanced: Since the data is spread out evenly over the servers, the responsibility for managing the data is automatically load-balanced across the cluster.
Ownership: Exactly one node in the cluster is responsible for each piece of data in the cache.
Point-To-Point: The communication for the partitioned cache is all point-to-point, enabling linear scalability.
Location Transparency: Although the data is spread out across cluster nodes, the exact same API is used to access the data, and the same behavior is provided by each of the API methods. This is called location transparency, which means that the developer does not have to code based on the topology of the cache, since the API and its behavior will be the same with a local cache, a replicated cache, or a distributed cache.
Failover: Failover of a distributed cache involves promoting backup data to be primary storage. When a cluster node fails, all remaining cluster nodes determine what data each holds in backup that the failed cluster node had primary responsible for when it died. Those data becomes the responsibility of whatever cluster node was the backup for the data. However, this adapter does not provide fault-tolerance implementation, each piece of data is kept in a single node/machine (via sharding), then, if a node fails, the data kept by this node won't be available for the rest of the cluster members.
Based on "Distributed Caching Essential Lessons" by Cameron Purdy and Coherence Partitioned Cache Service.
Consistent Hashing and Key Distribution
The adapter uses ExHashRing to implement consistent hashing, which maps keys
to nodes in a way that minimizes data redistribution when the cluster topology
changes.
How key distribution works
The process is as follows:
Virtual Nodes (Vnodes): Each physical node in the cluster is assigned a set of virtual nodes (vnodes) in the hash ring. This enables even distribution of keys across the cluster.
Key Hashing: When a key is accessed, its hash value (computed using
erlang:phash2/1) is used to find the corresponding vnode in the ring.Node Lookup:
ExHashRing.Ringfinds the node responsible for that vnode, which becomes the target for the operation.RPC Routing: The request is sent to the target node via RPC (remote procedure call) to read or write the cached value.
Benefits of consistent hashing
- Minimal Key Redistribution: When nodes join or leave, only a fraction of keys are redistributed to other nodes (proportional to the change in cluster size).
- Even Distribution: Keys are evenly spread across all nodes in the cluster.
- Predictable Mapping: The same key always maps to the same node, ensuring cache hits across the cluster.
- Efficient Lookup: Hash ring lookups are O(log n) in terms of vnodes.
Cluster Membership Management
The adapter maintains a distributed view of the hash ring across all cluster nodes using two key components:
Process Groups (:pg)
The adapter uses Erlang's built-in :pg (process groups) module to track
cluster membership. When a partitioned cache is started:
- The cache supervisor PID is registered in a
:pggroup named after the cache (e.g., the:nameoption or the cache module name). - All nodes with the same cache running join the same group.
- When a node joins or leaves the cluster,
:pgautomatically notifies all members subscribed to that group.
Ring Monitor
The Nebulex.Adapters.Partitioned.RingMonitor is a GenServer that:
Subscribes to Cluster Changes: Uses
Nebulex.Distributed.Cluster.monitor_scope/0to subscribe to all:pggroup changes via:pg.monitor_scope/1.Handles Join/Leave Events: When nodes join or leave a group, RingMonitor receives
{:join, group, pids}and{:leave, group, pids}messages and updates theExHashRing.Ringstate accordingly.Maintains Ring Consistency: Keeps the hash ring in sync with the current cluster topology by adding/removing nodes from the ring.
Handling Race Conditions During Startup
During initial cluster formation, multiple nodes may start simultaneously, leading to race conditions where some nodes miss join events from others. To solve this, the RingMonitor uses a periodic rejoin mechanism:
Rejoin Interval: The
:rejoin_intervaloption (default: 30 seconds) specifies an interval at which the RingMonitor periodically rejoins the:pggroup.Idempotent Joins: Since
:pgtreats duplicate joins as idempotent, a node can safely rejoin without negative side effects.Forced Ring Updates: Each periodic rejoin triggers
:joinevents that force all nodes to update their ring view, ensuring eventual consistency even if some initial join events were missed.
This mechanism ensures that all nodes have a consistent view of the ring, even in the face of concurrent startups or transient network issues.
Primary Storage Adapter
This adapter depends on a local cache adapter (primary storage), adding a distributed layer on top of it. You don't need to manually define the primary storage cache; the adapter initializes it automatically as part of the supervision tree.
The :primary_storage_adapter option (defaults to Nebulex.Adapters.Local)
configures which adapter to use for the local storage on each node. Options
for the primary adapter can be specified via the :primary configuration option.
Usage
The cache expects the :otp_app and :adapter as options when used.
The :otp_app should point to an OTP application with the cache
configuration. Optionally, you can configure the desired primary
storage adapter with the option :primary_storage_adapter
(defaults to Nebulex.Adapters.Local). See the compile time options
for more information:
:primary_storage_adapter(atom/0) - The adapter module used for the primary (local) storage on each cluster node. The partitioned adapter is a distributed wrapper that routes requests to the appropriate node based on consistent hashing. The actual data storage is handled by the primary storage adapter on each node. This option allows you to choose which adapter to use for this local storage. The configuration for the primary adapter is specified via the:primarystart option. The default value isNebulex.Adapters.Local.
For example:
defmodule MyApp.PartitionedCache do
use Nebulex.Cache,
otp_app: :my_app,
adapter: Nebulex.Adapters.Partitioned
endProviding the :primary_storage_adapter:
defmodule MyApp.PartitionedCache do
use Nebulex.Cache,
otp_app: :my_app,
adapter: Nebulex.Adapters.Partitioned,
primary_storage_adapter: Nebulex.Adapters.Local
endWhere the configuration for the cache must be in your application environment,
usually defined in your config/config.exs:
config :my_app, MyApp.PartitionedCache,
primary: [
gc_interval: :timer.hours(12),
gc_memory_check_interval: :timer.seconds(10),
max_size: 1_000_000,
allocated_memory: 2_000_000_000
]If your application was generated with a supervisor (by passing --sup
to mix new) you will have a lib/my_app/application.ex file containing
the application start callback that defines and starts your supervisor.
You just need to edit the start/2 function to start the cache as a
supervisor on your application's supervisor:
def start(_type, _args) do
children = [
{MyApp.PartitionedCache, []},
...
]See Nebulex.Cache for more information.
Configuration options
This adapter supports the following configuration options:
:primary(keyword/0) - Configuration options passed to the primary storage adapter specified via:primary_storage_adapter. The available options depend on which adapter you choose. Refer to the documentation of your chosen primary storage adapter for the complete list of supported options. The default value is[].:hash_ring(keyword/0) - Configuration options for the consistent hash ring used to distribute keys across cluster nodes. The hash ring maps each key to a node using virtual nodes (vnodes), which enables:- Minimal key redistribution when nodes join or leave.
- Even distribution of keys across nodes.
- Efficient node lookup for cache operations.
See
ExHashRing.Ring.start_link/2for the complete list of supported options.The default value is
[].:rejoin_interval(timeout/0) - The interval in milliseconds at which theRingMonitorperiodically rejoins the:pggroup to force ring synchronization across all cluster nodes.Purpose: This mechanism helps handle race conditions during concurrent node startups by ensuring all nodes eventually have a consistent view of the hash ring. Even if some join events are missed during initial cluster formation, each rejoin triggers new notifications that force ring updates.
Trade-offs:
Shorter intervals (e.g., 10 seconds):
- Faster consistency convergence.
- More overhead from frequent rejoin events and notifications.
- Better for highly dynamic clusters with frequent node changes.
Longer intervals (e.g., 60 seconds):
- Lower overhead and reduced network traffic.
- Slower eventual consistency.
- Fine for stable clusters that don't change frequently.
Default (30 seconds): Works well for most use cases, balancing consistency and overhead.
The default value is
30000.
Shared runtime options
When using the partitioned adapter, all of the cache functions outlined in
Nebulex.Cache accept the following options:
:timeout(timeout/0) - The time in milliseconds to wait for a cache command to finish.This timeout applies to RPC calls made to remote nodes during partitioned cache operations. Since the partitioned adapter routes requests across cluster nodes, network latency and node load affect execution time.
Set to
:infinityto wait indefinitely. If a timeout occurs, the operation fails with an error. Note that the underlying cache operation may still complete on the remote node asynchronously.The default value is
5000.
Stream options
The stream command supports the following options:
:on_error(:raise|:nothing) - Controls error handling during stream evaluation across cluster nodes.When streaming entries from a partitioned cache, the adapter evaluates the stream on each cluster node. Since this involves RPC calls to remote nodes, failures can occur due to:
- Network issues or RPC timeouts.
- Errors on the remote node.
- Temporary node unavailability.
Options:
:raise(default) - Raise an exception immediately when an error occurs on any node. The stream evaluation stops, and no further nodes are queried.:nothing- Skip errors silently and continue. Returns only successful results from nodes that responded without errors. Useful for resilience in environments where temporary node failures are acceptable.
The default value is
:raise.
Telemetry events
Since the partitioned adapter depends on the configured primary storage
cache (which uses a local cache adapter), this one will also emit Telemetry
events. Therefore, there will be events emitted by the partitioned adapter
as well as the primary storage cache. For example, the cache defined before
MyApp.PartitionedCache will emit the following events:
[:my_app, :partitioned_cache, :command, :start][:my_app, :partitioned_cache, :primary, :command, :start][:my_app, :partitioned_cache, :command, :stop][:my_app, :partitioned_cache, :primary, :command, :stop][:my_app, :partitioned_cache, :command, :exception][:my_app, :partitioned_cache, :primary, :command, :exception]
As you may notice, the telemetry prefix by default for the cache is
[:my_app, :partitioned_cache]. However, you could specify the
:telemetry_prefix for the primary storage within the :primary options
(if you want to override the default). See the
Telemetry guide
for more information and examples.
Adapter-specific telemetry events
The RingMonitor process emits the following Telemetry events during the lifetime of the partitioned cache:
telemetry_prefix ++ [:ring_monitor, :started]- Dispatched when the RingMonitor process starts.Measurements:
%{system_time: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom }
telemetry_prefix ++ [:ring_monitor, :joined]- Dispatched when the RingMonitor has successfully joined the:pggroup to enter the cluster.Measurements:
%{system_time: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom }
telemetry_prefix ++ [:ring_monitor, :nodes_added]- Dispatched when nodes are added to the hash ring.Measurements:
%{system_time: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, nodes: [atom] }
telemetry_prefix ++ [:ring_monitor, :nodes_removed]- Dispatched when nodes are removed from the hash ring.Measurements:
%{system_time: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, nodes: [atom] }
telemetry_prefix ++ [:ring_monitor, :exit]- Dispatched when the RingMonitor receives an EXIT signal.Measurements:
%{system_time: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, reason: term }
telemetry_prefix ++ [:ring_monitor, :stopped]- Dispatched when the RingMonitor process terminates.Measurements:
%{system_time: non_neg_integer}Metadata:
%{ adapter_meta: %{optional(atom) => term}, node: atom, reason: term }
Info API
As explained above, the partitioned adapter depends on the configured primary
storage adapter. Therefore, the information the info command provides will
depend on the primary storage adapter. The Nebulex built-in adapters support
the recommended keys :server, :memory, and :stats. Additionally, the
partitioned adapter supports:
:nodes_info- A map with the info for each node.:nodes- A list with the cluster nodes.
For example, the info for MyApp.PartitionedCache may look like this:
iex> MyApp.PartitionedCache.info!()
%{
memory: %{total: nil, used: 344600},
server: %{
cache_module: MyApp.PartitionedCache,
cache_name: :partitioned_cache,
cache_adapter: Nebulex.Adapters.Partitioned,
cache_pid: #PID<0.1053.0>,
nbx_version: "3.0.0"
},
stats: %{
hits: 0,
misses: 0,
writes: 0,
evictions: 0,
expirations: 0,
deletions: 0,
updates: 0
},
nodes: [:"node1@127.0.0.1", ...],
nodes_info: %{
"node1@127.0.0.1": %{
memory: %{total: nil, used: 68920},
server: %{
cache_module: MyApp.PartitionedCache.Primary,
cache_name: MyApp.PartitionedCache.Primary,
cache_adapter: Nebulex.Adapters.Local,
cache_pid: #PID<23981.823.0>,
nbx_version: "3.0.0"
},
stats: %{
hits: 0,
misses: 0,
writes: 0,
evictions: 0,
expirations: 0,
deletions: 0,
updates: 0
}
},
...
}
}Extended API
This adapter provides some additional convenience functions to the
Nebulex.Cache API.
Retrieving the primary storage or local cache module:
MyCache.__primary__()Retrieving the cluster nodes associated with the given cache name:
MyCache.nodes()Get a cluster node based on the given key:
MyCache.find_node("mykey")
MyCache.find_node!("mykey")Joining the cache to the cluster:
MyCache.join_cluster()Leaving the cluster (removes the cache from the cluster):
MyCache.leave_cluster()Caveats of partitioned adapter
For operations that receive anonymous functions as arguments, such as
Nebulex.Cache.get_and_update/3, Nebulex.Cache.update/4,
Nebulex.Cache.fetch_or_store/3, and Nebulex.Cache.get_or_store/3,
etc., there's an important consideration: these anonymous functions are
compiled into the module where they are created. Since the distributed adapter
executes operations on remote nodes, these functions may not exist on the
target nodes.
To ensure these operations work correctly in a distributed environment, you must provide functions from modules that exist on all nodes in the cluster. This can be achieved by:
- Using named functions from modules that are available across all nodes.
- Defining the functions in a shared module that's loaded on every node.
- Using function references that can be serialized and transmitted.
Example of the recommended approach:
# Instead of anonymous functions, use named functions from shared modules
defmodule MyApp.CacheHelpers do
def increment_value(current_value) do
(current_value || 0) + 1
end
end
# Use the named function in cache operations
MyCache.get_and_update("counter", &MyApp.CacheHelpers.increment_value/1)
Summary
Functions
Helper function to use dynamic cache for internal primary cache storage when needed.