View Source partisan_hyparview_peer_service_manager (partisan v5.0.0-rc.8)

This module realises the partisan_peer_service_manager behaviour implementing a peer-to-peer partial mesh topology using the protocol described in the paper HyParView: a membership protocol for reliable gossip-based broadcast by João Leitão, José Pereira and Luís Rodrigues.

The following content contains abstracts from the paper.

Characteristics

  • Uses TCP/IP as an unreliable failure detector (unreliable because it can generate false positives e.g. when the network becomes suddenly congested).
  • It can sustain high level of node failres while ensuring connectivity of the overlay. Nodes are considered "failed" when the TCP/IP connection is dropped.
  • Nodes maintain partial views of the network. Every node will contain and active view that forms a connected grah, and a passive view of backup links that are used to repair graph connectivity under failure. Some links to passive nodes are kept open for fast replacement of failed nodes in the active view. So the view is probabilistic, meaning that the protocol doesn't prevent (nor detects) the cluter to be split into several subclusters with no connections to each other.
  • HyParView sacrificies strong membership for high availability and connectivity: the algorithm constantly works towards and ensures that eventually the clsuter membership is a fully-connected component. However, at any point in time different nodes may have different, inconsistent views of the cluster membership. As a consequence, HyParView is not designed to work with systems that require strong membership properties, eg. consensus protocols like Paxos or Raft.
  • Point-to-point messaging for connected nodes with a minimum of 1 hop via transitive message delivery (as not all nodes directly connected). Delivery is probabilistic.
  • No explicit leave operation, because the overlay is able to react fast enough to node failures. Hence when a node wishes to leave the system it is simply treated as if the node have failed.
  • Scalability to up-to 2,000 nodes.

HyParView Membership Protocol

Partial View

A partial view is a small subset of the entire system (cluster) membership, a set of node specifications maintained locally at each node.

A node specification i.e. partisan:node_spec/0 allows a node to be reached by other nodes.

A membership protocol is in charge of initializing and maintaining the partial views at each node in face of dynamic changes in the system. For instance, when a new node joins the system, its identifier should be added to the partial view of (some) other nodes and it has to create its own partial view, including identifiers of nodes already in the system. Also, if a node fails or leaves the system, its identifier should be removed from all partial views as soon as possible.

Partial views establish neighboring associations among nodes. Therefore, partial views define an overlay network, in other words, partial views establish an directed graph that captures the neighbor relation between all nodes executing the protocol. In this graph nodes are represented by a vertex while a neighbor relation is represented by an arc from the node who contains the target node in his partial view.

Membership Protocol

The Hybrid Partial View (HyParView) membership protocol is in charge of maintaining two distinct views at each node: a small active view, of size log(n) + c, and a larger passive view, of size k(log(n) + c).

It then selects which members of this view should be promoted to the active view.

Active View

Each node maintains a small symmetric ctive view the size of fanout + 1. Being symmetric means means that if node q is in the active view of node p then node p is also in the active view of node q.

The active views af all cluster nodes create an overlay that is used for message dissemination. Each node keeps an open TCP connection to every other node in its active view.

Broadcast is performed deterministically by flooding the graph defined by the active views across the cluster. When a node receives a message for the first time, it broadcasts the message to all nodes of its active view ( except, obviously, to the node that has sent the message). While this graph is generated at random, gossip is deterministic as long as the graph remains unchanged.

Active View Management

A reactive strategy is used to maintain the active view. Nodes can be added to the active view when they join the system. Also, nodes are removed from the active view when they fail. When a node p suspects that one of the nodes present in its active view has failed (by either disconnecting or blocking), it selects a random node q from its passive view and attempts to establish a TCP connection with q. If the connection fails to establish, node q is considered failed and removed from p’s passive view; another node q′ is selected at random and a new attempt is made.

When the connection is established with success, p sends to q a Neighbor request with its own identifier and a priority level. The priority level of the request may take two values, depending on the number of nodes present in the active view of p: if p has no elements in its active view the priority is high; the priority is low otherwise.

A node q that receives a high priority neighbor request will always accept the request, even if it has to drop a random member from its active view ( again, the member that is dropped will receive a Disconnect notification). If a node q receives a low priority Neighbor request, it will only accept the request if it has a free slot in its active view, otherwise it will refuse the request.

If the node q accepts the Neighbor request, p will remove q’s identifier from its passive view and add it to the active view. If q rejects the Neighbor request, the initiator will select another node from its passive view and repeat the whole procedure (without removing q from its passive view).

Each node tests its entire active view every time it forwards a message. Therefore, the entire broadcast overlay is implicitly tested at every broadcast, which allows a very fast failure detection.

Passive View

In addition to the active view, each node maintains a larger passive view of backup nodes that can be promoted to the active view when one of the nodes in the active view fails.

The passive view is not used for message dissemination. Instead, the goal of the passive view is to maintain a list of nodes that can be used to replace failed members of the active view. The passive view is maintained using a cyclic strategy. Periodically, each node performs a shuffle operation with one of its neighbors in order to update its passive view.

Passive View Management

The passive view is maintained using a cyclic strategy. Periodically, each node perform a shuffle operation with one of its peers at random. The purpose of the shuffle operation is to update the passive views of the nodes involved in the exchange. The node p that initiates the exchange creates an exchange list with the following contents: p’s own identifier, ka nodes from its active view and kp nodes from its passive view (where ka and kp are protocol parameters). It then sends the list in a Shuffle request to a random neighbor of its active view. Shuffle requests are propagated using a random walk and have an associated “time to live”, just like the ForwardJoin requests.

A node q that receives a Shuffle request will first decrease its time to live. If the time to live of the message is greater than zero and the number of nodes in q’s active view is greater than 1, the node will select a random node from its active view, different from the one he received this shuffle message from, and simply forwards the Shuffle request. Otherwise, node q accepts the Shuffle request and send back, using a temporary TCP connection, a ShuffleReply message that includes a number of nodes selected at random from q’s passive view equal to the number of nodes received in the Shuffle request.

Then, both nodes integrate the elements they received in the Shuffle/ ShuffleReply mes- sage into their passive views (naturally, they exclude their own identifier and nodes that are part of the active or passive views). Because the passive view has a fixed length, it might get full; in that case, some identifiers will have to be removed in order to free space to include the new ones. A node will first attempt to remove identifiers sent to the peer. If no such identifiers remain in the passive view, it will remove identifiers at random.

Configuration

The following are the HyParView configuration parameters managed by partisan_config. The params are passed as {hyparview, Config} where Config is a property list or map where the keys are the following:

active_max_size
Defaults to 6.
active_min_size
Defaults to 3.
active_rwl
Active View Random Walk Length. Defaults to 6.
passive_max_size
Defaults to 30.
passive_rwl
Passive View Random Walk Length. Defaults to 6.
random_promotion
A boolean indicating if random promotion is enabled. Defaults true.
random_promotion_interval
Time after which the protocol attempts to promote a node in the passive view to the active view.Defaults to 5000.
shuffle_interval
Defaults to 10000.
shuffle_k_active
Number of peers to include in the shuffle exchange. Defaults to 3.
shuffle_k_passive
Number of peers to include in the shuffle exchange. Defaults to 4.

Summary

Types

The epoch_count indicates how many disconnect messages are generated.
t/0
The epoch indicates how many times the node is restarted.

Functions

Debugging.
Debugging.

Add to the active view.

Cast a message to a remote gen_server.
Cast a message to a remote gen_server.
Decode state.
Gensym support for forwarding.
Gensym support for forwarding.
Forward message to registered process on the remote side.
Return local node's view of cluster membership.
Inject a partition.
Attempt to join a remote node.
Leave the cluster.
Remove another node from the cluster.
Returns membership list.
Return membership list.
Register a trigger to fire when a connection drops.
Register a trigger to fire when a connection drops.
Register a trigger to fire when a connection opens.
Register a trigger to fire when a connection opens.
Return partitions.
Debugging.
Receive message from a remote manager.
Reserve a slot for the particular tag.
Resolve a partition.
Send message to a remote peer service manager.
Starts the peer service manager.
Attempt to join a remote node.
Update membership.

Types

-type active() :: sets:set(partisan:node_spec()).
-type call() ::
    {join, partisan:node_spec()} |
    {leave, partisan:node_spec()} |
    {update_members, [partisan:node_spec()]} |
    {resolve_partition, reference()} |
    {inject_partition, partisan:node_spec(), integer()} |
    {reserve, tag()} |
    active | passive |
    {active, tag()} |
    {send_message, node(), term()} |
    members | members_for_orchestration | get_local_state | connections | partitions.
-type cast() ::
    {join, partisan:node_spec()} |
    {receive_message, partisan:node_spec(), partisan:channel(), term()} |
    {disconnect, partisan:node_spec()}.
-type config() ::
    #{active_max_size := non_neg_integer(),
      active_min_size := non_neg_integer(),
      active_rwl := non_neg_integer(),
      passive_max_size := non_neg_integer(),
      passive_rwl := non_neg_integer(),
      random_promotion := boolean(),
      random_promotion_interval := non_neg_integer(),
      shuffle_interval := non_neg_integer(),
      shuffle_k_active := non_neg_integer(),
      shuffle_k_passive := non_neg_integer(),
      xbot_enabled := boolean(),
      xbot_interval := non_neg_integer()}.
-type epoch() :: non_neg_integer().
The epoch_count indicates how many disconnect messages are generated.
-type epoch_count() :: non_neg_integer().
-type message_id() :: {epoch(), epoch_count()}.
-type message_id_store() :: #{partisan:node_spec() := message_id()}.
-type passive() :: sets:set(partisan:node_spec()).
-type reserved() :: #{atom() := partisan:node_spec()}.
-type t() :: #state{}.
-type tag() :: atom().
The epoch indicates how many times the node is restarted.

Functions

Debugging.
Debugging.
Link to this function

add_to_active_view(Peer, Tag, State)

View Source

Add to the active view.

However, interesting race condition here: if the passive random walk timer exceeded and the node was added to the passive view, we might also have the active random walk timer exceed *after* because of a network delay; if so, we have to remove this element from the passive view, otherwise it will exist in both places.
Link to this function

cast_message(Term, Message)

View Source
-spec cast_message(Term :: partisan:any_pid() | partisan:any_name(), Message :: partisan:message()) ->
                ok.
Link to this function

cast_message(Node, ServerRef, Message)

View Source
Cast a message to a remote gen_server.
Link to this function

cast_message(Node, ServerRef, Message, Options)

View Source
Cast a message to a remote gen_server.
Link to this function

code_change(OldVsn, State, Extra)

View Source
-spec code_change(term() | {down, term()}, t(), term()) -> {ok, t()}.
-spec decode({state, sets:set(), any()} | sets:set()) -> list().
Decode state.
Link to this function

forward_message(Term, Message)

View Source
Gensym support for forwarding.
Link to this function

forward_message(PidOrName, Message, Opts)

View Source
Gensym support for forwarding.
Link to this function

forward_message(Node, ServerRef, Message, Opts)

View Source
Forward message to registered process on the remote side.
-spec get_local_state() -> {state, Active :: active(), Epoch :: integer()}.
Return local node's view of cluster membership.
Link to this function

handle_call(Cmd, From, State)

View Source
-spec handle_call(call(), {pid(), term()}, t()) -> {reply, term(), t()}.
Link to this function

handle_cast(Event, State)

View Source
-spec handle_cast(cast(), t()) -> {noreply, t()}.
Link to this function

handle_info(Event, State0)

View Source
-spec handle_info(term(), t()) -> {noreply, t()}.
-spec init([]) -> {ok, t()} | {stop, reservation_limit_exceeded}.
Link to this function

inject_partition(Origin, TTL)

View Source
Inject a partition.
Attempt to join a remote node.
Leave the cluster.
Remove another node from the cluster.
-spec members() -> [node()].
Returns membership list.
Link to this function

members_for_orchestration()

View Source
-spec members_for_orchestration() -> [partisan:node_spec()].
Return membership list.
Register a trigger to fire when a connection drops.
Link to this function

on_down(Name, Function, Opts)

View Source
Register a trigger to fire when a connection drops.
Register a trigger to fire when a connection opens.
Link to this function

on_up(Name, Function, Opts)

View Source
Register a trigger to fire when a connection opens.
Return partitions.
Debugging.
Link to this function

receive_message(Peer, Channel, Cmd)

View Source
Receive message from a remote manager.
Reserve a slot for the particular tag.
Link to this function

resolve_partition(Reference)

View Source
Resolve a partition.
Link to this function

send_message(Name, Message)

View Source
Send message to a remote peer service manager.
-spec start_link() -> {ok, pid()} | ignore | {error, term()}.
Starts the peer service manager.
Link to this function

supports_capability(Arg)

View Source
-spec supports_capability(Arg :: atom()) -> boolean().
Attempt to join a remote node.
Link to this function

terminate(Reason, State)

View Source
-spec terminate(term(), t()) -> term().
Update membership.