Module ra_server

Data Types

command()

command() = {command_type(), command_meta(), UserCommand::term(), command_reply_mode()} | {noop, command_meta(), CurrentMachineVersion::ra_machine:version()}

command_correlation()

command_correlation() = integer() | reference()

command_meta()

command_meta() = #{from => from(), ts := integer()}

command_priority()

command_priority() = normal | low

command_reply_mode()

command_reply_mode() = after_log_append | await_consensus | {notify, command_correlation(), pid()} | noreply

command_type()

command_type() = '$usr' | '$ra_join' | '$ra_leave' | '$ra_cluster_change' | '$ra_cluster'

config()

config() = ra_server_config()

effect()

effect() = ra_machine:effect() | ra_log:effect() | {reply, ra_reply_body()} | {reply, term(), ra_reply_body()} | {cast, ra_server_id(), term()} | {send_vote_requests, [{ra_server_id(), #request_vote_rpc{} | #pre_vote_rpc{}}]} | {send_rpc, ra_server_id(), #append_entries_rpc{}} | {send_snapshot, To::ra_server_id(), {Module::module(), Ref::term(), LeaderId::ra_server_id(), Term::ra_term()}} | {next_event, ra_msg()} | {next_event, cast, ra_msg()} | {notify, #{pid() => [term()]}} | {record_leader_msg, ra_server_id()} | start_election_timeout

effects()

effects() = [effect()]

machine_conf()

machine_conf() = {module, module(), InitConfig::map()} | {simple, simple_apply_fun(term()), InitialState::term()}

The machine configuration. This is how ra knows which module to use to invoke the ra_machine callbacks and the config to pass to the ra_machine:init/1 implementation. The simple machine config is version that can only be used for simple state machines that cannot access any of the advanced features.

mutable_config()

mutable_config() = #{cluster_name => ra_cluster_name(), metrics_key => term(), broadcast_time => non_neg_integer(), tick_timeout => non_neg_integer(), install_snap_rpc_timeout => non_neg_integer(), await_condition_timeout => non_neg_integer(), max_pipeline_count => non_neg_integer(), ra_event_formatter => {module(), atom(), [term()]}}

ra_await_condition_fun()

ra_await_condition_fun() = fun((ra_msg(), ra_server_state()) -> {boolean(), ra_server_state()})

ra_event_formatter_fun()

ra_event_formatter_fun() = fun((ra_server_id(), Evt::term()) -> term())

ra_msg()

ra_msg() = #append_entries_rpc{} | {ra_server_id(), #append_entries_reply{}} | {ra_server_id(), #install_snapshot_result{}} | #request_vote_rpc{} | #request_vote_result{} | #pre_vote_rpc{} | #pre_vote_result{} | #install_snapshot_rpc{} | election_timeout | await_condition_timeout | {command, command()} | {commands, [command()]} | ra_log:event() | {consistent_query, term(), ra:query_fun()} | #heartbeat_rpc{} | {ra_server_id, #heartbeat_reply{}} | pipeline_rpcs

ra_reply_body()

ra_reply_body() = #append_entries_reply{} | #request_vote_result{} | #install_snapshot_result{} | #pre_vote_result{}

ra_server_config()

ra_server_config() = #{id := ra_server_id(), uid := ra_uid(), cluster_name := ra_cluster_name(), log_init_args := ra_log:ra_log_init_args(), initial_members := [ra_server_id()], machine := machine_conf(), friendly_name => unicode:chardata(), metrics_key => term(), broadcast_time => non_neg_integer(), tick_timeout => non_neg_integer(), install_snap_rpc_timeout => non_neg_integer(), await_condition_timeout => non_neg_integer(), max_pipeline_count => non_neg_integer(), ra_event_formatter => {module(), atom(), [term()]}, counter => counters:counters_ref(), system_config => ra_system:config()}

ra_server_state()

ra_server_state() = #{cfg := #cfg{}, leader_id => maybe(ra_server_id()), cluster := ra_cluster(), cluster_change_permitted := boolean(), cluster_index_term := ra_idxterm(), previous_cluster => {ra_index(), ra_term(), ra_cluster()}, current_term := ra_term(), log := term(), voted_for => maybe(ra_server_id()), votes => non_neg_integer(), commit_index := ra_index(), last_applied := ra_index(), persisted_last_applied => ra_index(), stop_after => ra_index(), machine_state := term(), aux_state => term(), condition => ra_await_condition_fun(), condition_timeout_changes => #{transition_to := ra_state(), effects := [effect()]}, pre_vote_token => reference(), query_index := non_neg_integer(), queries_waiting_heartbeats := queue:queue({non_neg_integer(), consistent_query_ref()}), pending_consistent_queries := [consistent_query_ref()], commit_latency => maybe(non_neg_integer())}

ra_state()

ra_state() = leader | follower | candidate | pre_vote | await_condition | delete_and_terminate | terminating_leader | terminating_follower | recover | recovered | stop | receive_snapshot

simple_apply_fun()

simple_apply_fun(State) = fun((term(), State) -> State)

Function Index

clear_leader_id/1
current_term/1
handle_aux/4
handle_await_condition/2
handle_candidate/2
handle_down/5
handle_follower/2
handle_leader/2
handle_node_status/6
handle_pre_vote/2
handle_receive_snapshot/2
handle_state_enter/2
id/1
init/1
is_fully_persisted/1
is_fully_replicated/1
is_new/1
leader_id/1
log_fold/3
log_id/1
log_read/2
machine/1
machine_query/2
machine_version/1
make_rpcs/1
metrics/1
name/2
overview/1
persist_last_applied/1
process_new_leader_queries/1
recover/1
register_external_log_reader/2
system_config/1
terminate/2
tick/1
uid/1
update_peer/3
update_release_cursor/3

Function Details

clear_leader_id/1

clear_leader_id(State::ra_server_state()) -> ra_server_state()

current_term/1

current_term(State::ra_server_state()) -> maybe(ra_term())

handle_aux/4

handle_aux(RaftState, Type, Cmd, State0) -> any()

handle_await_condition/2

handle_await_condition(Request_vote_rpc::ra_msg(), State::ra_server_state()) -> {ra_state(), ra_server_state(), effects()}

handle_candidate/2

handle_candidate(Request_vote_result::ra_msg() | election_timeout, State0::ra_server_state()) -> {ra_state(), ra_server_state(), effects()}

handle_down/5

handle_down(RaftState::ra_state(), Type::machine | snapshot_sender | snapshot_writer | aux, Pid::pid(), Info::term(), State::ra_server_state()) -> {ra_state(), ra_server_state(), effects()}

handle_follower/2

handle_follower(Append_entries_rpc::ra_msg(), State00::ra_server_state()) -> {ra_state(), ra_server_state(), effects()}

handle_leader/2

handle_leader(Install_snapshot_rpc::ra_msg(), State0::ra_server_state()) -> {ra_state(), ra_server_state(), effects()}

handle_node_status/6

handle_node_status(RaftState::ra_state(), Type::machine | aux, Node::node(), Status::nodeup | nodedown, Infos::term(), State::ra_server_state()) -> {ra_state(), ra_server_state(), effects()}

handle_pre_vote/2

handle_pre_vote(Append_entries_rpc::ra_msg(), State0::ra_server_state()) -> {ra_state(), ra_server_state(), effects()}

handle_receive_snapshot/2

handle_receive_snapshot(Install_snapshot_rpc, State0) -> any()

handle_state_enter/2

handle_state_enter(RaftState::ra_state() | eol, State::ra_server_state()) -> {ra_server_state() | eol, effects()}

id/1

id(X1::ra_server_state()) -> ra_server_id()

init/1

init(Config::ra_server_config()) -> ra_server_state()

is_fully_persisted/1

is_fully_persisted(X1::ra_server_state()) -> boolean()

is_fully_replicated/1

is_fully_replicated(State::ra_server_state()) -> boolean()

is_new/1

is_new(X1::ra_server_state()) -> boolean()

leader_id/1

leader_id(State::ra_server_state()) -> maybe(ra_server_id())

log_fold/3

log_fold(RaState::ra_server_state(), Fun::fun((term(), State) -> State), State) -> {ok, State, ra_server_state()} | {error, term(), ra_server_state()}

log_id/1

log_id(X1::ra_server_state()) -> unicode:chardata()

log_read/2

log_read(Indexes::[ra_index()], State::ra_server_state()) -> {ok, [term()], ra_server_state()} | {error, ra_server_state()}

machine/1

machine(X1::ra_server_state()) -> ra_machine:machine()

machine_query/2

machine_query(QueryFun::fun((term()) -> term()), X2::ra_server_state()) -> {ra_idxterm(), term()}

machine_version/1

machine_version(X1::ra_server_state()) -> non_neg_integer()

make_rpcs/1

make_rpcs(State) -> any()

metrics/1

metrics(State::ra_server_state()) -> {atom(), ra_term(), ra_index(), ra_index(), ra_index(), ra_index(), non_neg_integer()}

name/2

name(ClusterName::ra_cluster_name(), UniqueSuffix::string()) -> atom()

overview/1

overview(State::ra_server_state()) -> map()

persist_last_applied/1

persist_last_applied(State::ra_server_state()) -> ra_server_state()

process_new_leader_queries/1

process_new_leader_queries(State0::ra_server_state()) -> {ra_server_state(), [from()]}

recover/1

recover(State0) -> any()

register_external_log_reader/2

register_external_log_reader(Pid::pid(), State::ra_server_state()) -> {ra_server_state(), effects()}

system_config/1

system_config(X1::ra_server_state()) -> ra_system:config()

terminate/2

terminate(State::ra_server_state(), Reason::{shutdown, delete} | term()) -> ok

tick/1

tick(X1::ra_server_state()) -> effects()

uid/1

uid(X1::ra_server_state()) -> ra_uid()

update_peer/3

update_peer(PeerId::ra_server_id(), Update::#{next_index => non_neg_integer(), query_index => non_neg_integer(), commit_index_sent => non_neg_integer(), status => ra_peer_status()}, State::ra_server_state()) -> ra_server_state()

update_release_cursor/3

update_release_cursor(Index::ra_index(), MacState::term(), State::ra_server_state()) -> {ra_server_state(), effects()}


Generated by EDoc