VsrServer behaviour (vsr v0.1.0)
View SourceGenServer wrapper that implements the VSR (Viewstamped Replication) protocol.
This module provides a framework for building VSR-enabled services by handling all VSR protocol messages while delegating application-specific logic to the implementing module.
Overview
VsrServer implements the core VSR consensus protocol including:
- Primary-backup replication with automatic failover
- View changes for fault tolerance
- Quorum-based consensus decisions
- Linearizable operation ordering
- Comprehensive telemetry instrumentation
Basic Usage
To create a VSR-enabled service, you need to implement the VsrServer behaviour and provide three key components:
- State Machine: Handles application operations
- Communication Layer: Routes VSR protocol messages between nodes
- Optional Configuration: Cluster size, timeouts, etc.
Minimal Example
defmodule MyKvStore do
use VsrServer
def start_link(opts) do
node_id = Keyword.fetch!(opts, :node_id)
cluster_size = Keyword.fetch!(opts, :cluster_size)
VsrServer.start_link(__MODULE__,
node_id: node_id,
cluster_size: cluster_size
)
end
@impl VsrServer
def init(vsr_state) do
# Initialize your application state
my_state = %{vsr: vsr_state, data: %{}}
{:ok, my_state}
end
@impl VsrServer
def handle_commit(operation, state) do
# Apply committed operation to your state machine
case operation do
{:write, key, value} ->
new_data = Map.put(state.data, key, value)
new_state = %{state | data: new_data}
{new_state, :ok}
{:read, key} ->
result = Map.get(state.data, key, {:error, :not_found})
{state, result}
end
end
@impl VsrServer
def send_reply(from, reply, state) do
# Send reply back to client
GenServer.reply(from, reply)
end
endState Machine Implementation
The state machine handles application-specific operations. You must implement:
Required Callback: handle_commit/2
@callback handle_commit(operation :: term, state :: term) ::
{new_state :: term, result :: term}This callback is invoked when an operation has been committed by the VSR cluster. It should:
- Apply the operation to your application state
- Return the updated state and operation result
- Be deterministic (same operation → same result)
Example:
def handle_commit({:increment, key}, state) do
current = Map.get(state.counters, key, 0)
new_counters = Map.put(state.counters, key, current + 1)
new_state = %{state | counters: new_counters}
{new_state, {:ok, current + 1}}
endCommunication Layer Implementation
The communication layer is implemented via the send_vsr/3 callback on your
module. This callback is invoked whenever VSR needs to send a protocol message
to another node.
Default Implementation (Erlang Distribution)
The use VsrServer macro provides a default implementation that works with
Erlang distribution using send/2:
defmodule MyKvStore do
use VsrServer # Provides default send_vsr implementation
# ... other callbacks
endThe default implementation is:
def send_vsr(destination, message, _inner_state) do
send(destination, {:"$vsr", message})
endThis works when node_id values are:
- PIDs (e.g.,
self()) - Registered names (e.g.,
:my_serveror{:global, :my_server}) - Node names (e.g.,
:"node1@localhost"with registered name)
Custom Communication Protocol
To use a different transport (HTTP, gRPC, JSON-RPC, etc.), override the
send_vsr/3 callback:
defmodule MyHttpKvStore do
use VsrServer
# Override the default send_vsr to use HTTP
@impl VsrServer
def send_vsr(dest_node_id, vsr_message, _inner_state) do
# Serialize VSR message to JSON
json_body = Jason.encode!(%{
"type" => message_type(vsr_message),
"view" => vsr_message.view,
"data" => serialize_message(vsr_message)
})
# Send via HTTP POST to the destination node
url = "http://#{dest_node_id}:8080/vsr"
HTTPoison.post(url, json_body, [{"Content-Type", "application/json"}])
end
defp message_type(%Vsr.Message.Prepare{}), do: "prepare"
defp message_type(%Vsr.Message.PrepareOk{}), do: "prepare_ok"
defp message_type(%Vsr.Message.Commit{}), do: "commit"
# ... other message types
defp serialize_message(%Vsr.Message.Prepare{} = msg) do
%{
"op_number" => msg.op_number,
"operation" => msg.operation,
"commit_number" => msg.commit_number,
"leader_id" => msg.leader_id
}
end
# ... other serializers
# ... other callbacks
endWhen receiving messages via your custom transport, deserialize and deliver to VsrServer:
defmodule MyHttpHandler do
def handle_vsr_request(conn) do
# Deserialize from JSON
{:ok, json} = Jason.decode(conn.body_params)
vsr_message = deserialize_vsr_message(json)
# Deliver to local VsrServer
VsrServer.vsr_send(conn.assigns.vsr_server, vsr_message)
# Send HTTP response
send_resp(conn, 200, "ok")
end
defp deserialize_vsr_message(%{"type" => "prepare", "data" => data} = json) do
%Vsr.Message.Prepare{
view: json["view"],
op_number: data["op_number"],
operation: data["operation"],
commit_number: data["commit_number"],
leader_id: data["leader_id"]
}
end
# ... deserialize other message types
endThe key insight is that send_vsr/3 only handles outgoing messages.
Incoming messages are delivered via VsrServer.vsr_send/2 after your
custom transport receives and deserializes them.
Client Request Handling
For client requests, use the send_reply/3 callback to reply back to clients:
@impl VsrServer
def send_reply(from, reply, state) do
case from do
# Standard GenServer from tuple (Erlang distribution)
{pid, ref} when is_pid(pid) and is_reference(ref) ->
GenServer.reply(from, reply)
# Custom from (e.g., HTTP request context)
%{conn: conn, request_id: req_id} ->
send_http_response(conn, req_id, reply)
# JSON-RPC from
%{"node" => node_id, "from" => client_ref} ->
send_json_rpc_reply(node_id, client_ref, reply)
end
endClient Request Deduplication
VsrServer provides automatic deduplication for client requests using
client_id and request_id:
# In your client API
def write(vsr_pid, key, value) do
client_id = :my_client_1
request_id = System.unique_integer([:positive])
VsrServer.client_request(vsr_pid,
{:write, key, value},
client_id: client_id,
request_id: request_id
)
endMultiple identical requests (same client_id and request_id) will:
- Be processed once
- All waiting callers receive the same result
- Prevent duplicate operations
Configuration Options
When starting VsrServer, you can configure:
VsrServer.start_link(MyModule,
# Required
node_id: :node1, # Unique identifier for this node
cluster_size: 3, # Total number of replicas
# Optional
replicas: [:node2, :node3], # Other replica node IDs (auto-computed if not provided)
heartbeat_interval: 100, # Primary heartbeat interval (ms)
heartbeat_timeout: 500, # Backup timeout for primary failure (ms)
name: {:global, :my_vsr_server} # GenServer registration
)Complete Example: Maelstrom Integration
See maelstrom-adapter/ directory for a complete example of implementing
VSR with a custom JSON-based protocol for Jepsen Maelstrom testing.
Key components:
Maelstrom.Kv- State machine implementation with customsend_vsr/3Maelstrom.Message- JSON message serializationMaelstrom.Stdio- STDIN/STDOUT message handling
This demonstrates a complete custom transport implementation without using Erlang distribution.
Telemetry Events
VsrServer emits comprehensive telemetry events for monitoring and debugging.
See Vsr.Telemetry module documentation for complete event reference.
Example telemetry handler:
:telemetry.attach_many(
"vsr-logger",
[
[:vsr, :state, :commit_advance],
[:vsr, :view_change, :complete],
[:vsr, :leadership, :start]
],
&handle_vsr_event/4,
nil
)
def handle_vsr_event(event_name, measurements, metadata, _config) do
Logger.info("VSR Event: #{inspect(event_name)}")
endError Handling
VsrServer follows "let it crash" philosophy:
- Invalid messages crash the GenServer (supervised recovery)
- Network failures are handled by retries/timeouts
- State machine errors crash and require supervisor restart
For graceful error handling in your state machine:
def handle_commit(operation, state) do
case validate_operation(operation) do
:ok ->
# Process operation
{new_state, result}
{:error, reason} ->
# Return error result, state unchanged
{state, {:error, reason}}
end
endTesting
For testing, you can use synchronous operations and inspect state:
test "writes are replicated" do
{:ok, replica1} = start_replica(:r1, 3)
{:ok, replica2} = start_replica(:r2, 3)
{:ok, replica3} = start_replica(:r3, 3)
# Perform operation
result = VsrServer.client_request(replica1, {:write, "key", "value"})
assert result == :ok
# Check state is replicated
state1 = VsrServer.dump(replica1)
state2 = VsrServer.dump(replica2)
assert state1.inner.data["key"] == "value"
assert state2.inner.data["key"] == "value"
end
Summary
Types
A client request that expects a reply. The from parameter is typically a GenServer.from() term,
if the cluster communicates using Erlang distribution. If it does not, you may encode the from
parameter in a way that is appropriate for the communication protocol used by the cluster.
A client request that does not expect a reply. This should be used sparingly, typically, it is
better to produce a synchronous reply, even if the response is just an :ok acknowledgement.
Callbacks
Invoked to change the state of the VsrServer when a different version of a
module is loaded (hot code swapping) and the state's term structure should be
changed.
Invoked to handle synchronous call/3 messages. Generally, initiating operations
on the state machine that VSR wraps should be done through this callback.
Invoked to handle asynchronous cast/2 messages.
Invoked when a VSR operation is committed and needs to be applied to the state machine.
Invoked to handle continue instructions.
Invoked to handle all other messages.
Invoked when the server is started. start_link/3 or start/3 will
block until it returns.
Appends an entry to the VSR log.
Clears all entries from the VSR log.
Fetches an entry from the VSR log by operation number.
Gets all entries from the VSR log.
Gets entries from the specified operation number onwards.
Gets the current length (number of entries) in the VSR log.
Replaces the entire VSR log with new entries.
Monitors another node in the cluster for failure detection.
Sends a reply to a client.
Sends a VSR message to another node in the cluster.
Invoked when the server is about to exit. It should do any cleanup required.
Functions
Returns a specification to start this module under a supervisor.
Dumps the internal state of a VSR server for testing and debugging.
Gets the node_id of a VSR server.
In the case that the cluster cannot be known at boot time, this function may be used to set cluster details.
In the case that the log cannot be known at boot time (for example, some parameter in the log setup depends on the cluster configuration), this function may be used to set the log.
In certain situations, you may need to send VSR messages out-of-band from the normal erlang distribution mechanism that VsrServer relies on by default. In this case you may use this function to send VSR messages to VSR servers.
Types
@type client_request() :: client_request_noreply() | {:client_request, from :: term(), term()}
A client request that expects a reply. The from parameter is typically a GenServer.from() term,
if the cluster communicates using Erlang distribution. If it does not, you may encode the from
parameter in a way that is appropriate for the communication protocol used by the cluster.
@type client_request_noreply() :: {:client_request, term()}
A client request that does not expect a reply. This should be used sparingly, typically, it is
better to produce a synchronous reply, even if the response is just an :ok acknowledgement.
@type node_id() :: term()
@opaque state()
Callbacks
@callback code_change(old_vsn, state :: term(), extra :: term()) :: {:ok, new_state :: term()} | {:error, reason :: term()} when old_vsn: term() | {:down, term()}
Invoked to change the state of the VsrServer when a different version of a
module is loaded (hot code swapping) and the state's term structure should be
changed.
See GenServer.code_change/3 for general information about this callback.
This callback is optional.
@callback handle_call(request :: term(), GenServer.from(), state :: term()) :: {:reply, reply, new_state} | {:reply, reply, new_state, timeout() | :hibernate | {:continue, continue_arg :: term()} | client_request_noreply()} | {:noreply, new_state} | {:noreply, new_state, timeout() | :hibernate | {:continue, continue_arg :: term()} | client_request()} | {:stop, reason, reply, new_state} | {:stop, reason, new_state} when reply: term(), new_state: term(), reason: term()
Invoked to handle synchronous call/3 messages. Generally, initiating operations
on the state machine that VSR wraps should be done through this callback.
See GenServer.handle_call/3 for general information about this callback.
To initiate a VSR request, use the :client_request tuple.
@callback handle_cast(request :: term(), state :: term()) :: {:noreply, new_state} | {:noreply, new_state, timeout() | :hibernate | {:continue, continue_arg :: term()} | client_request()} | {:stop, reason :: term(), new_state} when new_state: term()
Invoked to handle asynchronous cast/2 messages.
See GenServer.handle_cast/2 for general information about this callback.
To initiate a VSR request, pass the :client_request tuple. You may use a from argument
in the tuple that has been created by a previous call and stored in the server state.
This callback is optional. If one is not implemented, the server will fail if a cast is performed against it.
@callback handle_commit(operation :: term(), inner_state :: term()) :: {new_inner_state :: term(), result :: term()}
Invoked when a VSR operation is committed and needs to be applied to the state machine.
This callback receives the operation that was committed, the current inner state, and the full VSR state. It should apply the operation and return the new inner state and any result value to be sent back to the client.
@callback handle_continue(continue_arg, state :: term()) :: {:noreply, new_state} | {:noreply, new_state, timeout() | :hibernate | {:continue, continue_arg} | client_request()} | {:stop, reason :: term(), new_state} when new_state: term(), continue_arg: term()
Invoked to handle continue instructions.
See GenServer.handle_info/2 for general information about this callback.
Return values are the same as handle_cast/2.
This callback is optional. If one is not implemented, the server will fail if a continue instruction is used.
@callback handle_info(msg :: :timeout | term(), state :: term()) :: {:noreply, new_state} | {:noreply, new_state, timeout() | :hibernate | {:continue, continue_arg :: term()} | client_request()} | {:stop, reason :: term(), new_state} when new_state: term()
Invoked to handle all other messages.
See GenServer.handle_info/2 for general information about this callback.
Return values are the same as handle_cast/2.
This callback is optional. If one is not implemented, the received message will be logged.
@callback init(init_arg :: term()) :: {:ok, state} | {:ok, log, state} | {:ok, log, state, timeout() | :hibernate | {:continue, continue_arg :: term()}} | :ignore | {:stop, reason :: term()} when state: term(), log: term()
Invoked when the server is started. start_link/3 or start/3 will
block until it returns.
See GenServer.init/1 for general information about the init/1 callback.
The VsrServer init callback adds an additional log term in the success tuple,
which initializes the VSR log, This should be a durable, local store for logging
VSR operations.
If the log must be initialized at a later stage (for example, via an out-of band initialization, then you may return the normal {:ok, state} term)
@callback log_append(log :: term(), entry :: Vsr.LogEntry.t()) :: new_log :: term()
Appends an entry to the VSR log.
Required callback - must be implemented by all VSR modules.
Clears all entries from the VSR log.
Required callback - must be implemented by all VSR modules.
@callback log_fetch(log :: term(), op_number :: non_neg_integer()) :: {:ok, Vsr.LogEntry.t()} | {:error, :not_found}
Fetches an entry from the VSR log by operation number.
Required callback - must be implemented by all VSR modules.
@callback log_get_all(log :: term()) :: [Vsr.LogEntry.t()]
Gets all entries from the VSR log.
Required callback - must be implemented by all VSR modules.
@callback log_get_from(log :: term(), op_number :: non_neg_integer()) :: [ Vsr.LogEntry.t() ]
Gets entries from the specified operation number onwards.
Required callback - must be implemented by all VSR modules.
@callback log_length(log :: term()) :: non_neg_integer()
Gets the current length (number of entries) in the VSR log.
Required callback - must be implemented by all VSR modules.
@callback log_replace(log :: term(), entries :: [Vsr.LogEntry.t()]) :: new_log :: term()
Replaces the entire VSR log with new entries.
Used during state transfer and view changes. Required callback - must be implemented by all VSR modules.
Monitors another node in the cluster for failure detection.
The default implementation uses Process.monitor/1 for PID-based node identifiers.
Override this for custom monitoring mechanisms.
Sends a reply to a client.
The default implementation uses GenServer.reply/2 for standard Erlang distribution.
Override this for custom communication protocols where the from parameter
needs to be handled differently (e.g., encoded node references, message passing).
Sends a VSR message to another node in the cluster.
The default implementation uses PIDs as node identifiers and sends via raw send/2.
Override this for custom communication protocols (e.g., Maelstrom, network protocols).
@callback terminate(reason, state :: term()) :: term() when reason: :normal | :shutdown | {:shutdown, term()} | term()
Invoked when the server is about to exit. It should do any cleanup required.
See GenServer.terminate/2 for general information about this callback.
This callback is optional.
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
Dumps the internal state of a VSR server for testing and debugging.
Gets the node_id of a VSR server.
@spec set_cluster( server(), node_id, replicas :: Enumerable.t(node_id), cluster_size :: non_neg_integer() | nil ) :: :ok when node_id: term()
In the case that the cluster cannot be known at boot time, this function may be used to set cluster details.
In the case that the log cannot be known at boot time (for example, some parameter in the log setup depends on the cluster configuration), this function may be used to set the log.
In certain situations, you may need to send VSR messages out-of-band from the normal erlang distribution mechanism that VsrServer relies on by default. In this case you may use this function to send VSR messages to VSR servers.