Module ra_machine

The ra_machine behaviour.

Description

The ra_machine behaviour.

Used to implement the logic for the state machine running inside Ra.

Callbacks

-callback init(Conf :: machine_init_args()) -> state()'

Initialize a new machine state.


-callback apply(Meta :: command_meta_data(), command(), State) -> {State, reply(), effects()} | {State, reply()}

Applies each entry to the state machine.


-callback state_enter(ra_server:ra_state() | eol, state()) -> effects().

Optional. Called when the ra server enters a new state. Called for all states in the ra_server_proc gen_statem implementation not just for the standard Raft states (follower, candidate, leader). If implemented it is sensible to include a catch all clause as new states may be implemented in the future.


-callback tick(TimeMs :: milliseconds(), state()) -> effects().

Optional. Called periodically. Suitable for issuing periodic side effects such as updating metrics systems.


-callback overview(state()) -> map().

Optional. A map of overview information. Needs to be efficient.


-callback version() -> version().

Optional: Returns the latest machine version. If not implemented this is defaulted to 0.

-callback which_module(version()) -> module().

Optional: implements a lookup from version to the module implementing the machine logic for that version.

Data Types

builtin_command()

builtin_command() = {down, pid(), term()} | {nodeup | nodedown, node()} | {timeout, term()}

These commands may be passed to the apply/2 function in reaction to monitor effects

command()

command() = user_command() | builtin_command()

command_meta_data()

command_meta_data() = #{system_time := integer(), index := ra_index(), term := ra_term(), machine_version => version(), from => from(), reply_mode => ra_server:command_reply_mode()}

extensible command meta data map

effect()

effect() = {send_msg, To::locator(), Msg::term()} | {send_msg, To::locator(), Msg::term(), Options::send_msg_opts()} | {mod_call, module(), Function::atom(), [term()]} | {append, term()} | {append, term(), ra_server:command_reply_mode()} | {monitor, process, pid()} | {monitor, node, node()} | {demonitor, process, pid()} | {demonitor, node, node()} | {timer, term(), non_neg_integer() | infinity} | {log, [ra_index()], fun(([user_command()]) -> effects())} | {log, [ra_index()], fun(([user_command()]) -> effects()), {local, node()}} | {release_cursor, ra_index(), state()} | {release_cursor, ra_index()} | {checkpoint, ra_index(), state()} | {aux, term()} | garbage_collection

effects()

effects() = [effect()]

See: effect

locator()

locator() = pid() | atom() | {atom(), node()}

machine()

machine() = {machine, module(), AddInitArgs::#{term() => term()}}

Machine configuration. the module() should implement the ra_machine behaviour.

milliseconds()

milliseconds() = non_neg_integer()

reply()

reply() = term()

an arbitrary term that can be returned to the caller, _if_ the caller used ra:process_command/2 or ra:process_command/3

send_msg_opt()

send_msg_opt() = ra_event | cast | local

ra_event: the message will be wrapped up and sent as a ra event e.g: {ra_event, ra_server_id(), Msg}

cast: the message will be wrapped as a gen cast: {'$cast', Msg} local: the message will be sent by the local member if there is one configured

send_msg_opts()

send_msg_opts() = send_msg_opt() | [send_msg_opt()]

state()

state() = term()

The state for a given machine implementation.

user_command()

user_command() = term()

the command type for a given machine implementation

version()

version() = non_neg_integer()

Function Index

apply/4
handle_aux/6
handle_aux/7
init/2initialise a new machine This is only called on startup only if there isn't yet a snapshot to recover from.
init_aux/2
is_versioned/1
module/1
overview/2
query/3
snapshot_installed/5
snapshot_module/1
state_enter/3called when the ra_server_proc enters a new state.
tick/3
version/1used to discover the latest machine version supported by the current code.
which_aux_fun/1
which_module/2

Function Details

apply/4

apply(Mod::module(), Metadata::command_meta_data(), Cmd::command(), State) -> {State, reply(), effects()} | {State, reply()}

handle_aux/6

handle_aux(Mod::module(), RaftState::ra_server:ra_state(), Type::{call, From::from()} | cast, Command::term(), AuxState, State) -> {reply, Reply::term(), AuxState, State} | {reply, Reply::term(), AuxState, State, [{monitor, process, aux, pid()}]} | {no_reply, AuxState, State} | {no_reply, AuxState, State, [{monitor, process, aux, pid()}]}

handle_aux/7

handle_aux(Mod::module(), RaftState::ra_server:ra_state(), Type::{call, From::from()} | cast, Command::term(), AuxState, LogState, MacState::state()) -> undefined | {reply, Reply::term(), AuxState, LogState} | {reply, Reply::term(), AuxState, LogState, [{monitor, process, aux, pid()}]} | {no_reply, AuxState, LogState} | {no_reply, AuxState, LogState, [{monitor, process, aux, pid()}]}

init/2

init(Machine::machine(), Name::atom()) -> state()

initialise a new machine This is only called on startup only if there isn't yet a snapshot to recover from. Once a snapshot has been taken this is never called again.

init_aux/2

init_aux(Mod::module(), Name::atom()) -> term()

is_versioned/1

is_versioned(X1::machine()) -> boolean()

module/1

module(X1::machine()) -> module()

overview/2

overview(Mod::module(), State::state()) -> map()

query/3

query(Mod::module(), Fun::fun((state()) -> Result), State::state()) -> Result

snapshot_installed/5

snapshot_installed(Module, Meta, State, OldMeta, OldState) -> effects()

snapshot_module/1

snapshot_module(X1::machine()) -> module()

state_enter/3

state_enter(Mod::module(), RaftState::ra_server:ra_state() | eol, State::state()) -> effects()

called when the ra_server_proc enters a new state

tick/3

tick(Mod::module(), TimeMs::milliseconds(), State::state()) -> effects()

version/1

version(X1::machine()) -> version()

used to discover the latest machine version supported by the current code

which_aux_fun/1

which_aux_fun(Mod::module()) -> undefined | {atom(), arity()}

which_module/2

which_module(X1::machine(), Version::version()) -> module()


Generated by EDoc