macula_gossip (macula v0.20.5)

View Source

Gossip protocol for CRDT state replication.

Implements a gossip-based protocol for eventually-consistent state synchronization across nodes. Uses push-pull-push anti-entropy:

- Push: Periodically sends local state changes to random peers - Pull: Requests state from peers when needed - Anti-entropy: Full state synchronization to repair divergence

Configuration Parameters: - push_interval: 1000ms (how often to push to peers) - anti_entropy_interval: 30000ms (how often to run anti-entropy) - fanout: 3 (number of peers to contact per round)

Summary

Functions

Add a peer to the gossip list.

Trigger anti-entropy synchronization.

Delete a CRDT value.

Force synchronization with all peers.

Get a CRDT value.

Get all stored CRDT states.

Get the list of known peers.

Get gossip statistics.

Handle incoming gossip_pull message.

Handle incoming gossip_pull_reply message.

Handle incoming gossip_push message.

Handle incoming gossip_sync message.

Handle incoming gossip_sync_reply message.

Pull state from a specific peer.

Push local state to a specific peer.

Store a CRDT value. Type is the CRDT type: lww_register, or_set, gcounter, pncounter

Remove a peer from the gossip list.

Start the gossip server.

Stop the gossip server.

Types

gossip_state/0

-type gossip_state() ::
          #state{node_id :: binary(),
                 realm :: binary(),
                 states :: #{binary() => {atom(), term(), map()}},
                 pending_pulls :: #{binary() => {pid(), reference()}},
                 peers :: [binary()],
                 send_fn :: fun((binary(), map()) -> ok | {error, term()}),
                 push_interval :: pos_integer(),
                 anti_entropy_interval :: pos_integer(),
                 fanout :: pos_integer(),
                 push_timer :: reference() | undefined,
                 anti_entropy_timer :: reference() | undefined,
                 push_count :: non_neg_integer(),
                 pull_count :: non_neg_integer(),
                 merge_count :: non_neg_integer(),
                 conflict_count :: non_neg_integer()}.

Functions

add_peer(Pid, PeerNodeId)

-spec add_peer(pid(), binary()) -> ok.

Add a peer to the gossip list.

anti_entropy(Pid)

-spec anti_entropy(pid()) -> ok.

Trigger anti-entropy synchronization.

delete(Pid, Key)

-spec delete(pid(), binary()) -> ok.

Delete a CRDT value.

force_sync(Pid)

-spec force_sync(pid()) -> ok.

Force synchronization with all peers.

get(Pid, Key)

-spec get(pid(), binary()) -> {ok, {atom(), term()}} | {error, not_found}.

Get a CRDT value.

get_all(Pid)

-spec get_all(pid()) -> #{binary() => {atom(), term()}}.

Get all stored CRDT states.

get_peers(Pid)

-spec get_peers(pid()) -> [binary()].

Get the list of known peers.

get_stats(Pid)

-spec get_stats(pid()) -> map().

Get gossip statistics.

handle_call(_, From, State)

handle_cast(_, State)

handle_gossip_pull(Pid, Msg)

-spec handle_gossip_pull(pid(), map()) -> ok.

Handle incoming gossip_pull message.

handle_gossip_pull_reply(Pid, Msg)

-spec handle_gossip_pull_reply(pid(), map()) -> ok.

Handle incoming gossip_pull_reply message.

handle_gossip_push(Pid, Msg)

-spec handle_gossip_push(pid(), map()) -> ok.

Handle incoming gossip_push message.

handle_gossip_sync(Pid, Msg)

-spec handle_gossip_sync(pid(), map()) -> ok.

Handle incoming gossip_sync message.

handle_gossip_sync_reply(Pid, Msg)

-spec handle_gossip_sync_reply(pid(), map()) -> ok.

Handle incoming gossip_sync_reply message.

handle_info(Info, State)

init(Config)

pull_state(Pid, PeerNodeId)

-spec pull_state(pid(), binary()) -> ok | {error, term()}.

Pull state from a specific peer.

push_state(Pid, PeerNodeId)

-spec push_state(pid(), binary()) -> ok | {error, term()}.

Push local state to a specific peer.

put(Pid, Key, Type, Value)

-spec put(pid(), binary(), atom(), term()) -> ok.

Store a CRDT value. Type is the CRDT type: lww_register, or_set, gcounter, pncounter

remove_peer(Pid, PeerNodeId)

-spec remove_peer(pid(), binary()) -> ok.

Remove a peer from the gossip list.

start_link(Config)

-spec start_link(map()) -> {ok, pid()} | {error, term()}.

Start the gossip server.

stop(Pid)

-spec stop(pid()) -> ok.

Stop the gossip server.

terminate(Reason, State)