Nebulex.Adapters.Partitioned (Nebulex.Distributed v3.0.0-rc.2)

View Source

Adapter module for the partitioned cache topology.

Features

  • Partitioned cache topology (Sharding Distribution Model).
  • Consistent hashing via ExHashRing for distributing keys across cluster nodes.
  • Automatic cluster membership management using Erlang's :pg (process groups).
  • Support for transactions via Erlang global name registration facility.
  • Configurable primary storage adapter.

Partitioned Cache Topology

There are several key points to consider about a partitioned cache:

  • Partitioned: The data in a distributed cache is spread out over all the servers in such a way that no two servers are responsible for the same piece of cached data. This means that the size of the cache and the processing power associated with the management of the cache can grow linearly with the size of the cluster. Also, it means that operations against data in the cache can be accomplished with a "single hop," in other words, involving at most one other server.

  • Load-Balanced: Since the data is spread out evenly over the servers, the responsibility for managing the data is automatically load-balanced across the cluster.

  • Ownership: Exactly one node in the cluster is responsible for each piece of data in the cache.

  • Point-To-Point: The communication for the partitioned cache is all point-to-point, enabling linear scalability.

  • Location Transparency: Although the data is spread out across cluster nodes, the exact same API is used to access the data, and the same behavior is provided by each of the API methods. This is called location transparency, which means that the developer does not have to code based on the topology of the cache, since the API and its behavior will be the same with a local cache, a replicated cache, or a distributed cache.

  • Failover: Failover of a distributed cache involves promoting backup data to be primary storage. When a cluster node fails, all remaining cluster nodes determine what data each holds in backup that the failed cluster node had primary responsible for when it died. Those data becomes the responsibility of whatever cluster node was the backup for the data. However, this adapter does not provide fault-tolerance implementation, each piece of data is kept in a single node/machine (via sharding), then, if a node fails, the data kept by this node won't be available for the rest of the cluster members.

Based on "Distributed Caching Essential Lessons" by Cameron Purdy and Coherence Partitioned Cache Service.

Consistent Hashing and Key Distribution

The adapter uses ExHashRing to implement consistent hashing, which maps keys to nodes in a way that minimizes data redistribution when the cluster topology changes.

How key distribution works

The process is as follows:

  1. Virtual Nodes (Vnodes): Each physical node in the cluster is assigned a set of virtual nodes (vnodes) in the hash ring. This enables even distribution of keys across the cluster.

  2. Key Hashing: When a key is accessed, its hash value (computed using erlang:phash2/1) is used to find the corresponding vnode in the ring.

  3. Node Lookup: ExHashRing.Ring finds the node responsible for that vnode, which becomes the target for the operation.

  4. RPC Routing: The request is sent to the target node via RPC (remote procedure call) to read or write the cached value.

Benefits of consistent hashing

  • Minimal Key Redistribution: When nodes join or leave, only a fraction of keys are redistributed to other nodes (proportional to the change in cluster size).
  • Even Distribution: Keys are evenly spread across all nodes in the cluster.
  • Predictable Mapping: The same key always maps to the same node, ensuring cache hits across the cluster.
  • Efficient Lookup: Hash ring lookups are O(log n) in terms of vnodes.

Cluster Membership Management

The adapter maintains a distributed view of the hash ring across all cluster nodes using two key components:

Process Groups (:pg)

The adapter uses Erlang's built-in :pg (process groups) module to track cluster membership. When a partitioned cache is started:

  1. The cache supervisor PID is registered in a :pg group named after the cache (e.g., the :name option or the cache module name).
  2. All nodes with the same cache running join the same group.
  3. When a node joins or leaves the cluster, :pg automatically notifies all members subscribed to that group.

Ring Monitor

The Nebulex.Adapters.Partitioned.RingMonitor is a GenServer that:

  1. Subscribes to Cluster Changes: Uses Nebulex.Distributed.Cluster.monitor_scope/0 to subscribe to all :pg group changes via :pg.monitor_scope/1.

  2. Handles Join/Leave Events: When nodes join or leave a group, RingMonitor receives {:join, group, pids} and {:leave, group, pids} messages and updates the ExHashRing.Ring state accordingly.

  3. Maintains Ring Consistency: Keeps the hash ring in sync with the current cluster topology by adding/removing nodes from the ring.

Handling Race Conditions During Startup

During initial cluster formation, multiple nodes may start simultaneously, leading to race conditions where some nodes miss join events from others. To solve this, the RingMonitor uses a periodic rejoin mechanism:

  • Rejoin Interval: The :rejoin_interval option (default: 30 seconds) specifies an interval at which the RingMonitor periodically rejoins the :pg group.

  • Idempotent Joins: Since :pg treats duplicate joins as idempotent, a node can safely rejoin without negative side effects.

  • Forced Ring Updates: Each periodic rejoin triggers :join events that force all nodes to update their ring view, ensuring eventual consistency even if some initial join events were missed.

This mechanism ensures that all nodes have a consistent view of the ring, even in the face of concurrent startups or transient network issues.

Primary Storage Adapter

This adapter depends on a local cache adapter (primary storage), adding a distributed layer on top of it. You don't need to manually define the primary storage cache; the adapter initializes it automatically as part of the supervision tree.

The :primary_storage_adapter option (defaults to Nebulex.Adapters.Local) configures which adapter to use for the local storage on each node. Options for the primary adapter can be specified via the :primary configuration option.

Usage

The cache expects the :otp_app and :adapter as options when used. The :otp_app should point to an OTP application with the cache configuration. Optionally, you can configure the desired primary storage adapter with the option :primary_storage_adapter (defaults to Nebulex.Adapters.Local). See the compile time options for more information:

  • :primary_storage_adapter (atom/0) - The adapter module used for the primary (local) storage on each cluster node. The partitioned adapter is a distributed wrapper that routes requests to the appropriate node based on consistent hashing. The actual data storage is handled by the primary storage adapter on each node. This option allows you to choose which adapter to use for this local storage. The configuration for the primary adapter is specified via the :primary start option. The default value is Nebulex.Adapters.Local.

For example:

defmodule MyApp.PartitionedCache do
  use Nebulex.Cache,
    otp_app: :my_app,
    adapter: Nebulex.Adapters.Partitioned
end

Providing the :primary_storage_adapter:

defmodule MyApp.PartitionedCache do
  use Nebulex.Cache,
    otp_app: :my_app,
    adapter: Nebulex.Adapters.Partitioned,
    primary_storage_adapter: Nebulex.Adapters.Local
end

Where the configuration for the cache must be in your application environment, usually defined in your config/config.exs:

config :my_app, MyApp.PartitionedCache,
  primary: [
    gc_interval: :timer.hours(12),
    gc_memory_check_interval: :timer.seconds(10),
    max_size: 1_000_000,
    allocated_memory: 2_000_000_000
  ]

If your application was generated with a supervisor (by passing --sup to mix new) you will have a lib/my_app/application.ex file containing the application start callback that defines and starts your supervisor. You just need to edit the start/2 function to start the cache as a supervisor on your application's supervisor:

def start(_type, _args) do
  children = [
    {MyApp.PartitionedCache, []},
    ...
  ]

See Nebulex.Cache for more information.

Configuration options

This adapter supports the following configuration options:

  • :primary (keyword/0) - Configuration options passed to the primary storage adapter specified via :primary_storage_adapter. The available options depend on which adapter you choose. Refer to the documentation of your chosen primary storage adapter for the complete list of supported options. The default value is [].

  • :hash_ring (keyword/0) - Configuration options for the consistent hash ring used to distribute keys across cluster nodes. The hash ring maps each key to a node using virtual nodes (vnodes), which enables:

    • Minimal key redistribution when nodes join or leave.
    • Even distribution of keys across nodes.
    • Efficient node lookup for cache operations.

    See ExHashRing.Ring.start_link/2 for the complete list of supported options.

    The default value is [].

  • :rejoin_interval (timeout/0) - The interval in milliseconds at which the RingMonitor periodically rejoins the :pg group to force ring synchronization across all cluster nodes.

    Purpose: This mechanism helps handle race conditions during concurrent node startups by ensuring all nodes eventually have a consistent view of the hash ring. Even if some join events are missed during initial cluster formation, each rejoin triggers new notifications that force ring updates.

    Trade-offs:

    • Shorter intervals (e.g., 10 seconds):

      • Faster consistency convergence.
      • More overhead from frequent rejoin events and notifications.
      • Better for highly dynamic clusters with frequent node changes.
    • Longer intervals (e.g., 60 seconds):

      • Lower overhead and reduced network traffic.
      • Slower eventual consistency.
      • Fine for stable clusters that don't change frequently.

    Default (30 seconds): Works well for most use cases, balancing consistency and overhead.

    The default value is 30000.

Shared runtime options

When using the partitioned adapter, all of the cache functions outlined in Nebulex.Cache accept the following options:

  • :timeout (timeout/0) - The time in milliseconds to wait for a cache command to finish.

    This timeout applies to RPC calls made to remote nodes during partitioned cache operations. Since the partitioned adapter routes requests across cluster nodes, network latency and node load affect execution time.

    Set to :infinity to wait indefinitely. If a timeout occurs, the operation fails with an error. Note that the underlying cache operation may still complete on the remote node asynchronously.

    The default value is 5000.

Stream options

The stream command supports the following options:

  • :on_error (:raise | :nothing) - Controls error handling during stream evaluation across cluster nodes.

    When streaming entries from a partitioned cache, the adapter evaluates the stream on each cluster node. Since this involves RPC calls to remote nodes, failures can occur due to:

    • Network issues or RPC timeouts.
    • Errors on the remote node.
    • Temporary node unavailability.

    Options:

    • :raise (default) - Raise an exception immediately when an error occurs on any node. The stream evaluation stops, and no further nodes are queried.

    • :nothing - Skip errors silently and continue. Returns only successful results from nodes that responded without errors. Useful for resilience in environments where temporary node failures are acceptable.

    The default value is :raise.

Telemetry events

Since the partitioned adapter depends on the configured primary storage cache (which uses a local cache adapter), this one will also emit Telemetry events. Therefore, there will be events emitted by the partitioned adapter as well as the primary storage cache. For example, the cache defined before MyApp.PartitionedCache will emit the following events:

  • [:my_app, :partitioned_cache, :command, :start]
  • [:my_app, :partitioned_cache, :primary, :command, :start]
  • [:my_app, :partitioned_cache, :command, :stop]
  • [:my_app, :partitioned_cache, :primary, :command, :stop]
  • [:my_app, :partitioned_cache, :command, :exception]
  • [:my_app, :partitioned_cache, :primary, :command, :exception]

As you may notice, the telemetry prefix by default for the cache is [:my_app, :partitioned_cache]. However, you could specify the :telemetry_prefix for the primary storage within the :primary options (if you want to override the default). See the Telemetry guide for more information and examples.

Adapter-specific telemetry events

The RingMonitor process emits the following Telemetry events during the lifetime of the partitioned cache:

  • telemetry_prefix ++ [:ring_monitor, :started] - Dispatched when the RingMonitor process starts.

    • Measurements: %{system_time: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom
      }
  • telemetry_prefix ++ [:ring_monitor, :joined] - Dispatched when the RingMonitor has successfully joined the :pg group to enter the cluster.

    • Measurements: %{system_time: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom
      }
  • telemetry_prefix ++ [:ring_monitor, :nodes_added] - Dispatched when nodes are added to the hash ring.

    • Measurements: %{system_time: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        nodes: [atom]
      }
  • telemetry_prefix ++ [:ring_monitor, :nodes_removed] - Dispatched when nodes are removed from the hash ring.

    • Measurements: %{system_time: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        nodes: [atom]
      }
  • telemetry_prefix ++ [:ring_monitor, :exit] - Dispatched when the RingMonitor receives an EXIT signal.

    • Measurements: %{system_time: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        reason: term
      }
  • telemetry_prefix ++ [:ring_monitor, :stopped] - Dispatched when the RingMonitor process terminates.

    • Measurements: %{system_time: non_neg_integer}

    • Metadata:

      %{
        adapter_meta: %{optional(atom) => term},
        node: atom,
        reason: term
      }

Info API

As explained above, the partitioned adapter depends on the configured primary storage adapter. Therefore, the information the info command provides will depend on the primary storage adapter. The Nebulex built-in adapters support the recommended keys :server, :memory, and :stats. Additionally, the partitioned adapter supports:

  • :nodes_info - A map with the info for each node.
  • :nodes - A list with the cluster nodes.

For example, the info for MyApp.PartitionedCache may look like this:

iex> MyApp.PartitionedCache.info!()
%{
  memory: %{total: nil, used: 344600},
  server: %{
    cache_module: MyApp.PartitionedCache,
    cache_name: :partitioned_cache,
    cache_adapter: Nebulex.Adapters.Partitioned,
    cache_pid: #PID<0.1053.0>,
    nbx_version: "3.0.0"
  },
  stats: %{
    hits: 0,
    misses: 0,
    writes: 0,
    evictions: 0,
    expirations: 0,
    deletions: 0,
    updates: 0
  },
  nodes: [:"node1@127.0.0.1", ...],
  nodes_info: %{
    "node1@127.0.0.1": %{
      memory: %{total: nil, used: 68920},
      server: %{
        cache_module: MyApp.PartitionedCache.Primary,
        cache_name: MyApp.PartitionedCache.Primary,
        cache_adapter: Nebulex.Adapters.Local,
        cache_pid: #PID<23981.823.0>,
        nbx_version: "3.0.0"
      },
      stats: %{
        hits: 0,
        misses: 0,
        writes: 0,
        evictions: 0,
        expirations: 0,
        deletions: 0,
        updates: 0
      }
    },
    ...
  }
}

Extended API

This adapter provides some additional convenience functions to the Nebulex.Cache API.

Retrieving the primary storage or local cache module:

MyCache.__primary__()

Retrieving the cluster nodes associated with the given cache name:

MyCache.nodes()

Get a cluster node based on the given key:

MyCache.find_node("mykey")

MyCache.find_node!("mykey")

Joining the cache to the cluster:

MyCache.join_cluster()

Leaving the cluster (removes the cache from the cluster):

MyCache.leave_cluster()

Caveats of partitioned adapter

For operations that receive anonymous functions as arguments, such as Nebulex.Cache.get_and_update/3, Nebulex.Cache.update/4, Nebulex.Cache.fetch_or_store/3, and Nebulex.Cache.get_or_store/3, etc., there's an important consideration: these anonymous functions are compiled into the module where they are created. Since the distributed adapter executes operations on remote nodes, these functions may not exist on the target nodes.

To ensure these operations work correctly in a distributed environment, you must provide functions from modules that exist on all nodes in the cluster. This can be achieved by:

  • Using named functions from modules that are available across all nodes.
  • Defining the functions in a shared module that's loaded on every node.
  • Using function references that can be serialized and transmitted.

Example of the recommended approach:

# Instead of anonymous functions, use named functions from shared modules
defmodule MyApp.CacheHelpers do
  def increment_value(current_value) do
    (current_value || 0) + 1
  end
end

# Use the named function in cache operations
MyCache.get_and_update("counter", &MyApp.CacheHelpers.increment_value/1)

Summary

Functions

Helper function to use dynamic cache for internal primary cache storage when needed.

Functions

with_dynamic_cache(adapter_meta, action, args)

Helper function to use dynamic cache for internal primary cache storage when needed.