View Source RaftedValue (rafted_value v0.11.2)

Public interface functions of RaftedValue package.

Link to this section Summary

Functions

Replaces the current configuration.

Tells a member to forget about another member.

Executes a read-only query on the stored value of leader.

Executes a read-only query on the stored value of the specified member.

Obtains last log index from the log files stored in dir.

Removes a follower from a consensus group.

Replaces current leader of a consensus group from current_leader to new_leader.

Starts a new member of consensus group.

Retrieves status of a member in a consensus group.

Link to this section Types

@type command_identifier() :: reference() | any()
Link to this type

consensus_group_info()

View Source
@type consensus_group_info() ::
  {:create_new_consensus_group, RaftedValue.Config.t()}
  | {:join_existing_consensus_group, [GenServer.server()]}
@type not_leader() :: {:not_leader, nil | pid()}
@type option() ::
  {:name, atom()}
  | {:persistence_dir, Path.t()}
  | {:log_file_expansion_factor, number()}
  | {:spawn_opt, [:proc_lib.spawn_option()]}
Link to this type

remove_follower_error_reason()

View Source
@type remove_follower_error_reason() ::
  not_leader()
  | :uncommitted_membership_change
  | :not_member
  | :pending_leader_change
  | :cannot_remove_leader
  | :will_break_quorum
Link to this type

replace_leader_error_reason()

View Source
@type replace_leader_error_reason() ::
  not_leader()
  | :uncommitted_membership_change
  | :not_member
  | :new_leader_unresponsive
@type status_result() :: %{
  from: pid(),
  members: [pid()],
  leader: nil | pid(),
  unresponsive_followers: [pid()],
  current_term: RaftedValue.TermNumber.t(),
  state_name: :leader | :candidate | :follower,
  config: RaftedValue.Config.t()
}

Link to this section Functions

Link to this function

change_config(leader, new_config)

View Source
@spec change_config(GenServer.server(), RaftedValue.Config.t()) ::
  :ok | {:error, not_leader()}

Replaces the current configuration.

The new configuration is replicated (as raft log) to all members.

Link to this function

command(leader, command_arg, timeout \\ 5000, id \\ make_ref(), call_module \\ :gen_statem)

View Source
@spec command(
  GenServer.server(),
  RaftedValue.Data.command_arg(),
  timeout(),
  command_identifier(),
  module()
) ::
  {:ok, RaftedValue.Data.command_ret()}
  | {:error, :noproc | :timeout | not_leader()}

Executes a command on the stored value of leader.

id is an identifier of the command and can be used to filter out duplicate requests.

If the leader does not respond in a timely manner, the function returns {:error, :timeout}, i.e., it internally catches exit. Caller process should be prepared to handle delayed reply (e.g. by dropping delayed reply by handle_info(_msg, state)).

Link to this function

force_remove_member(member, member_to_remove)

View Source
@spec force_remove_member(GenServer.server(), pid()) :: :ok

Tells a member to forget about another member.

The sole purpose of this function is to recover a consensus group from majority failure. This operation is unsafe in the sense that it may not preserve invariants of the Raft algorithm (for example some committed logs may be lost); use this function as a last resort.

Membership change introduced by this function is not propagated to other members. It is caller's responsibility

  • to stop member_to_remove if it is still alive before calling this function, and
  • to notify all existing members of the removal (i.e. call this function for all existing members).
Link to this function

make_config(data_module, opts \\ [])

View Source
@spec make_config(atom(), Keyword.t(any())) :: RaftedValue.Config.t()

Make a new instance of RaftedValue.Config struct.

data_module must be an implementation of RaftedValue.Data behaviour. Available options:

  • leader_hook_module: An implementation of RaftedValue.LeaderHook. Defaults to RaftedValue.LeaderHook.NoOp.
  • communication_module: A module to define member-to-member async communication (cast/2 and reply/2). Defaults to RaftedValue.RemoteMessageGateway. This is useful for performance optimizations (and also internal testing).
  • heartbeat_timeout: Raft's heartbeat timeout in milliseconds. Defaults to 200.
  • election_timeout: Raft's leader election timeout in milliseconds. The acrual timeout value in each member is randomly chosen from election_timeout .. 2 * election_timeout. Defaults to 1000.
  • election_timeout_clock_drift_margin: A time margin in milliseconds to judge whether leader lease has expired or not. When a leader gets responses from majority it gets a lease for election_timeout - margin. During the lease time the leader can assume that no other members are ever elected leader. This enables the leader to skip message round trips during processing read-only query. Defaults to the value of election_timeout (i.e. no lease time, disabling this clock-based optimization).
  • max_retained_command_results: Number of command results to be cached, in order not to doubly apply the same command due to client retries. Defaults to 100.
Link to this function

query(leader, query_arg, timeout \\ 5000, call_module \\ :gen_statem)

View Source
@spec query(
  GenServer.server(),
  RaftedValue.Data.query_arg(),
  timeout(),
  module()
) ::
  {:ok, RaftedValue.Data.query_ret()}
  | {:error, :noproc | :timeout | not_leader()}

Executes a read-only query on the stored value of leader.

If the leader does not respond in a timely manner, the function returns {:error, :timeout}, i.e., it internally catches exit. Caller process should be prepared to handle delayed reply (e.g. by dropping delayed reply by handle_info(_msg, state)).

Link to this function

query_non_leader(member, query_arg, timeout \\ 5000)

View Source
@spec query_non_leader(
  GenServer.server(),
  RaftedValue.Data.query_arg(),
  timeout()
) :: {:ok, RaftedValue.Data.query_ret()} | {:error, :noproc | :timeout}

Executes a read-only query on the stored value of the specified member.

Unlike query/3, this variant allows non-leader members to reply to the query. Return value of this function can be stale; it may not reflect recent updates.

Link to this function

read_last_log_index(dir)

View Source
@spec read_last_log_index(Path.t()) :: nil | RaftedValue.LogIndex.t()

Obtains last log index from the log files stored in dir.

Link to this function

remove_follower(leader, follower_pid)

View Source
@spec remove_follower(GenServer.server(), pid()) ::
  :ok | {:error, remove_follower_error_reason()}

Removes a follower from a consensus group.

Link to this function

replace_leader(current_leader, new_leader)

View Source
@spec replace_leader(GenServer.server(), nil | pid()) ::
  :ok | {:error, replace_leader_error_reason()}

Replaces current leader of a consensus group from current_leader to new_leader.

By specifying nil as new_leader, you can cancel the previously-issued and stuck attempt of replacing leader.

Link to this function

start_link(info, options \\ [])

View Source
@spec start_link(consensus_group_info(), [option()]) :: GenServer.on_start()

Starts a new member of consensus group.

The 1st argument specifies the consensus group to belong to:

  • {:create_new_consensus_group, config}: Creates a new consensus group using the given RaftedValue.Config.t. The group's sole member is the newly-spawned process and it immediately becomes leader.
  • {:join_existing_consensus_group, members}: Joins an already running consensus group as a follower.

The 2nd argument is a keyword list of options to specify member-specific configurations.

  • :name: An atom for local name registration.
  • :persistence_dir: Directory path in which both Raft logs and periodic snapshots are stored. If not given, the newly-spawned process will run in in-memory mode; it does not persist its state. The specified directory will be created if it does not exist. Note that it's caller's responsibility to ensure that the specified directory is not used by other RaftedValue process. See below for details of restoring state from snapshot and logs.
  • :log_file_expansion_factor: A number that adjusts when to make a snapshot file. This does not take effect if :persistence_dir is not given. RaftedValue keeps track of the file sizes of "currently growing log file" and "previous snapshot file". A new snapshot is created when log_file_size > previous_snapshot_size * expansion_factor. Small value means frequent snapshotting, which can result in high overhead. On the other hand larger expansion factor may lead to longer recovery time. Defaults to 10.
  • :spawn_opt: A keyword list of options to be passed to :erlang.spawn_opt/4.

restoring-state-from-snapshot-and-log-files

Restoring state from snapshot and log files

RaftedValue implements state recovery using log and snapshot files. The recovery procedure is triggered when:

  • A new process is start_linked with {:create_new_consensus_group, config} and :persistence_dir.
  • Valid log and snapshot files exist in the given :persistence_dir.

Then the newly-spawned process loads the latest snapshot and log files and forms a new 1-member consensus group with the restored Raft state (except for membership information). When restoring from snapshot and logs, config passed in the 1st argument is neglected in favor of configurations in the snapshot and logs.

If you don't want to restore from snapshot and log files (i.e. want to start a consensus group from scratch), then you must clean up the directory before calling start_link/2.

Link to this function

status(server, timeout \\ 5000)

View Source
@spec status(GenServer.server(), timeout()) :: status_result()

Retrieves status of a member in a consensus group.