VsrServer behaviour (vsr v0.1.0)

View Source

GenServer wrapper that implements the VSR (Viewstamped Replication) protocol.

This module provides a framework for building VSR-enabled services by handling all VSR protocol messages while delegating application-specific logic to the implementing module.

Overview

VsrServer implements the core VSR consensus protocol including:

  • Primary-backup replication with automatic failover
  • View changes for fault tolerance
  • Quorum-based consensus decisions
  • Linearizable operation ordering
  • Comprehensive telemetry instrumentation

Basic Usage

To create a VSR-enabled service, you need to implement the VsrServer behaviour and provide three key components:

  1. State Machine: Handles application operations
  2. Communication Layer: Routes VSR protocol messages between nodes
  3. Optional Configuration: Cluster size, timeouts, etc.

Minimal Example

defmodule MyKvStore do
  use VsrServer

  def start_link(opts) do
    node_id = Keyword.fetch!(opts, :node_id)
    cluster_size = Keyword.fetch!(opts, :cluster_size)

    VsrServer.start_link(__MODULE__,
      node_id: node_id,
      cluster_size: cluster_size
    )
  end

  @impl VsrServer
  def init(vsr_state) do
    # Initialize your application state
    my_state = %{vsr: vsr_state, data: %{}}
    {:ok, my_state}
  end

  @impl VsrServer
  def handle_commit(operation, state) do
    # Apply committed operation to your state machine
    case operation do
      {:write, key, value} ->
        new_data = Map.put(state.data, key, value)
        new_state = %{state | data: new_data}
        {new_state, :ok}

      {:read, key} ->
        result = Map.get(state.data, key, {:error, :not_found})
        {state, result}
    end
  end

  @impl VsrServer
  def send_reply(from, reply, state) do
    # Send reply back to client
    GenServer.reply(from, reply)
  end
end

State Machine Implementation

The state machine handles application-specific operations. You must implement:

Required Callback: handle_commit/2

@callback handle_commit(operation :: term, state :: term) ::
  {new_state :: term, result :: term}

This callback is invoked when an operation has been committed by the VSR cluster. It should:

  • Apply the operation to your application state
  • Return the updated state and operation result
  • Be deterministic (same operation → same result)

Example:

def handle_commit({:increment, key}, state) do
  current = Map.get(state.counters, key, 0)
  new_counters = Map.put(state.counters, key, current + 1)
  new_state = %{state | counters: new_counters}
  {new_state, {:ok, current + 1}}
end

Communication Layer Implementation

The communication layer is implemented via the send_vsr/3 callback on your module. This callback is invoked whenever VSR needs to send a protocol message to another node.

Default Implementation (Erlang Distribution)

The use VsrServer macro provides a default implementation that works with Erlang distribution using send/2:

defmodule MyKvStore do
  use VsrServer  # Provides default send_vsr implementation

  # ... other callbacks
end

The default implementation is:

def send_vsr(destination, message, _inner_state) do
  send(destination, {:"$vsr", message})
end

This works when node_id values are:

  • PIDs (e.g., self())
  • Registered names (e.g., :my_server or {:global, :my_server})
  • Node names (e.g., :"node1@localhost" with registered name)

Custom Communication Protocol

To use a different transport (HTTP, gRPC, JSON-RPC, etc.), override the send_vsr/3 callback:

defmodule MyHttpKvStore do
  use VsrServer

  # Override the default send_vsr to use HTTP
  @impl VsrServer
  def send_vsr(dest_node_id, vsr_message, _inner_state) do
    # Serialize VSR message to JSON
    json_body = Jason.encode!(%{
      "type" => message_type(vsr_message),
      "view" => vsr_message.view,
      "data" => serialize_message(vsr_message)
    })

    # Send via HTTP POST to the destination node
    url = "http://#{dest_node_id}:8080/vsr"
    HTTPoison.post(url, json_body, [{"Content-Type", "application/json"}])
  end

  defp message_type(%Vsr.Message.Prepare{}), do: "prepare"
  defp message_type(%Vsr.Message.PrepareOk{}), do: "prepare_ok"
  defp message_type(%Vsr.Message.Commit{}), do: "commit"
  # ... other message types

  defp serialize_message(%Vsr.Message.Prepare{} = msg) do
    %{
      "op_number" => msg.op_number,
      "operation" => msg.operation,
      "commit_number" => msg.commit_number,
      "leader_id" => msg.leader_id
    }
  end
  # ... other serializers

  # ... other callbacks
end

When receiving messages via your custom transport, deserialize and deliver to VsrServer:

defmodule MyHttpHandler do
  def handle_vsr_request(conn) do
    # Deserialize from JSON
    {:ok, json} = Jason.decode(conn.body_params)
    vsr_message = deserialize_vsr_message(json)

    # Deliver to local VsrServer
    VsrServer.vsr_send(conn.assigns.vsr_server, vsr_message)

    # Send HTTP response
    send_resp(conn, 200, "ok")
  end

  defp deserialize_vsr_message(%{"type" => "prepare", "data" => data} = json) do
    %Vsr.Message.Prepare{
      view: json["view"],
      op_number: data["op_number"],
      operation: data["operation"],
      commit_number: data["commit_number"],
      leader_id: data["leader_id"]
    }
  end

  # ... deserialize other message types
end

The key insight is that send_vsr/3 only handles outgoing messages. Incoming messages are delivered via VsrServer.vsr_send/2 after your custom transport receives and deserializes them.

Client Request Handling

For client requests, use the send_reply/3 callback to reply back to clients:

@impl VsrServer
def send_reply(from, reply, state) do
  case from do
    # Standard GenServer from tuple (Erlang distribution)
    {pid, ref} when is_pid(pid) and is_reference(ref) ->
      GenServer.reply(from, reply)

    # Custom from (e.g., HTTP request context)
    %{conn: conn, request_id: req_id} ->
      send_http_response(conn, req_id, reply)

    # JSON-RPC from
    %{"node" => node_id, "from" => client_ref} ->
      send_json_rpc_reply(node_id, client_ref, reply)
  end
end

Client Request Deduplication

VsrServer provides automatic deduplication for client requests using client_id and request_id:

# In your client API
def write(vsr_pid, key, value) do
  client_id = :my_client_1
  request_id = System.unique_integer([:positive])

  VsrServer.client_request(vsr_pid,
    {:write, key, value},
    client_id: client_id,
    request_id: request_id
  )
end

Multiple identical requests (same client_id and request_id) will:

  • Be processed once
  • All waiting callers receive the same result
  • Prevent duplicate operations

Configuration Options

When starting VsrServer, you can configure:

VsrServer.start_link(MyModule,
  # Required
  node_id: :node1,                    # Unique identifier for this node
  cluster_size: 3,                     # Total number of replicas

  # Optional
  replicas: [:node2, :node3],          # Other replica node IDs (auto-computed if not provided)
  heartbeat_interval: 100,             # Primary heartbeat interval (ms)
  heartbeat_timeout: 500,              # Backup timeout for primary failure (ms)
  name: {:global, :my_vsr_server}      # GenServer registration
)

Complete Example: Maelstrom Integration

See maelstrom-adapter/ directory for a complete example of implementing VSR with a custom JSON-based protocol for Jepsen Maelstrom testing.

Key components:

  • Maelstrom.Kv - State machine implementation with custom send_vsr/3
  • Maelstrom.Message - JSON message serialization
  • Maelstrom.Stdio - STDIN/STDOUT message handling

This demonstrates a complete custom transport implementation without using Erlang distribution.

Telemetry Events

VsrServer emits comprehensive telemetry events for monitoring and debugging. See Vsr.Telemetry module documentation for complete event reference.

Example telemetry handler:

:telemetry.attach_many(
  "vsr-logger",
  [
    [:vsr, :state, :commit_advance],
    [:vsr, :view_change, :complete],
    [:vsr, :leadership, :start]
  ],
  &handle_vsr_event/4,
  nil
)

def handle_vsr_event(event_name, measurements, metadata, _config) do
  Logger.info("VSR Event: #{inspect(event_name)}")
end

Error Handling

VsrServer follows "let it crash" philosophy:

  • Invalid messages crash the GenServer (supervised recovery)
  • Network failures are handled by retries/timeouts
  • State machine errors crash and require supervisor restart

For graceful error handling in your state machine:

def handle_commit(operation, state) do
  case validate_operation(operation) do
    :ok ->
      # Process operation
      {new_state, result}

    {:error, reason} ->
      # Return error result, state unchanged
      {state, {:error, reason}}
  end
end

Testing

For testing, you can use synchronous operations and inspect state:

test "writes are replicated" do
  {:ok, replica1} = start_replica(:r1, 3)
  {:ok, replica2} = start_replica(:r2, 3)
  {:ok, replica3} = start_replica(:r3, 3)

  # Perform operation
  result = VsrServer.client_request(replica1, {:write, "key", "value"})
  assert result == :ok

  # Check state is replicated
  state1 = VsrServer.dump(replica1)
  state2 = VsrServer.dump(replica2)

  assert state1.inner.data["key"] == "value"
  assert state2.inner.data["key"] == "value"
end

Summary

Types

A client request that expects a reply. The from parameter is typically a GenServer.from() term, if the cluster communicates using Erlang distribution. If it does not, you may encode the from parameter in a way that is appropriate for the communication protocol used by the cluster.

A client request that does not expect a reply. This should be used sparingly, typically, it is better to produce a synchronous reply, even if the response is just an :ok acknowledgement.

Callbacks

Invoked to change the state of the VsrServer when a different version of a module is loaded (hot code swapping) and the state's term structure should be changed.

Invoked to handle synchronous call/3 messages. Generally, initiating operations on the state machine that VSR wraps should be done through this callback.

Invoked to handle asynchronous cast/2 messages.

Invoked when a VSR operation is committed and needs to be applied to the state machine.

Invoked to handle continue instructions.

Invoked to handle all other messages.

Invoked when the server is started. start_link/3 or start/3 will block until it returns.

Appends an entry to the VSR log.

Clears all entries from the VSR log.

Fetches an entry from the VSR log by operation number.

Gets all entries from the VSR log.

Gets entries from the specified operation number onwards.

Gets the current length (number of entries) in the VSR log.

Replaces the entire VSR log with new entries.

Monitors another node in the cluster for failure detection.

Sends a reply to a client.

Sends a VSR message to another node in the cluster.

Invoked when the server is about to exit. It should do any cleanup required.

Functions

Returns a specification to start this module under a supervisor.

Dumps the internal state of a VSR server for testing and debugging.

Gets the node_id of a VSR server.

In the case that the cluster cannot be known at boot time, this function may be used to set cluster details.

In the case that the log cannot be known at boot time (for example, some parameter in the log setup depends on the cluster configuration), this function may be used to set the log.

In certain situations, you may need to send VSR messages out-of-band from the normal erlang distribution mechanism that VsrServer relies on by default. In this case you may use this function to send VSR messages to VSR servers.

Types

client_request()

@type client_request() ::
  client_request_noreply() | {:client_request, from :: term(), term()}

A client request that expects a reply. The from parameter is typically a GenServer.from() term, if the cluster communicates using Erlang distribution. If it does not, you may encode the from parameter in a way that is appropriate for the communication protocol used by the cluster.

client_request_noreply()

@type client_request_noreply() :: {:client_request, term()}

A client request that does not expect a reply. This should be used sparingly, typically, it is better to produce a synchronous reply, even if the response is just an :ok acknowledgement.

node_id()

@type node_id() :: term()

server()

@type server() :: pid() | atom()

state()

@opaque state()

Callbacks

code_change(old_vsn, state, extra)

@callback code_change(old_vsn, state :: term(), extra :: term()) ::
  {:ok, new_state :: term()} | {:error, reason :: term()}
when old_vsn: term() | {:down, term()}

Invoked to change the state of the VsrServer when a different version of a module is loaded (hot code swapping) and the state's term structure should be changed.

See GenServer.code_change/3 for general information about this callback.

This callback is optional.

handle_call(request, from, state)

@callback handle_call(request :: term(), GenServer.from(), state :: term()) ::
  {:reply, reply, new_state}
  | {:reply, reply, new_state,
     timeout()
     | :hibernate
     | {:continue, continue_arg :: term()}
     | client_request_noreply()}
  | {:noreply, new_state}
  | {:noreply, new_state,
     timeout()
     | :hibernate
     | {:continue, continue_arg :: term()}
     | client_request()}
  | {:stop, reason, reply, new_state}
  | {:stop, reason, new_state}
when reply: term(), new_state: term(), reason: term()

Invoked to handle synchronous call/3 messages. Generally, initiating operations on the state machine that VSR wraps should be done through this callback.

See GenServer.handle_call/3 for general information about this callback.

To initiate a VSR request, use the :client_request tuple.

handle_cast(request, state)

@callback handle_cast(request :: term(), state :: term()) ::
  {:noreply, new_state}
  | {:noreply, new_state,
     timeout()
     | :hibernate
     | {:continue, continue_arg :: term()}
     | client_request()}
  | {:stop, reason :: term(), new_state}
when new_state: term()

Invoked to handle asynchronous cast/2 messages.

See GenServer.handle_cast/2 for general information about this callback.

To initiate a VSR request, pass the :client_request tuple. You may use a from argument in the tuple that has been created by a previous call and stored in the server state.

This callback is optional. If one is not implemented, the server will fail if a cast is performed against it.

handle_commit(operation, inner_state)

@callback handle_commit(operation :: term(), inner_state :: term()) ::
  {new_inner_state :: term(), result :: term()}

Invoked when a VSR operation is committed and needs to be applied to the state machine.

This callback receives the operation that was committed, the current inner state, and the full VSR state. It should apply the operation and return the new inner state and any result value to be sent back to the client.

handle_continue(continue_arg, state)

@callback handle_continue(continue_arg, state :: term()) ::
  {:noreply, new_state}
  | {:noreply, new_state,
     timeout() | :hibernate | {:continue, continue_arg} | client_request()}
  | {:stop, reason :: term(), new_state}
when new_state: term(), continue_arg: term()

Invoked to handle continue instructions.

See GenServer.handle_info/2 for general information about this callback.

Return values are the same as handle_cast/2.

This callback is optional. If one is not implemented, the server will fail if a continue instruction is used.

handle_info(msg, state)

@callback handle_info(msg :: :timeout | term(), state :: term()) ::
  {:noreply, new_state}
  | {:noreply, new_state,
     timeout()
     | :hibernate
     | {:continue, continue_arg :: term()}
     | client_request()}
  | {:stop, reason :: term(), new_state}
when new_state: term()

Invoked to handle all other messages.

See GenServer.handle_info/2 for general information about this callback.

Return values are the same as handle_cast/2.

This callback is optional. If one is not implemented, the received message will be logged.

init(init_arg)

@callback init(init_arg :: term()) ::
  {:ok, state}
  | {:ok, log, state}
  | {:ok, log, state,
     timeout() | :hibernate | {:continue, continue_arg :: term()}}
  | :ignore
  | {:stop, reason :: term()}
when state: term(), log: term()

Invoked when the server is started. start_link/3 or start/3 will block until it returns.

See GenServer.init/1 for general information about the init/1 callback.

The VsrServer init callback adds an additional log term in the success tuple, which initializes the VSR log, This should be a durable, local store for logging VSR operations.

If the log must be initialized at a later stage (for example, via an out-of band initialization, then you may return the normal {:ok, state} term)

log_append(log, entry)

@callback log_append(log :: term(), entry :: Vsr.LogEntry.t()) :: new_log :: term()

Appends an entry to the VSR log.

Required callback - must be implemented by all VSR modules.

log_clear(log)

@callback log_clear(log :: term()) :: new_log :: term()

Clears all entries from the VSR log.

Required callback - must be implemented by all VSR modules.

log_fetch(log, op_number)

@callback log_fetch(log :: term(), op_number :: non_neg_integer()) ::
  {:ok, Vsr.LogEntry.t()} | {:error, :not_found}

Fetches an entry from the VSR log by operation number.

Required callback - must be implemented by all VSR modules.

log_get_all(log)

@callback log_get_all(log :: term()) :: [Vsr.LogEntry.t()]

Gets all entries from the VSR log.

Required callback - must be implemented by all VSR modules.

log_get_from(log, op_number)

@callback log_get_from(log :: term(), op_number :: non_neg_integer()) :: [
  Vsr.LogEntry.t()
]

Gets entries from the specified operation number onwards.

Required callback - must be implemented by all VSR modules.

log_length(log)

@callback log_length(log :: term()) :: non_neg_integer()

Gets the current length (number of entries) in the VSR log.

Required callback - must be implemented by all VSR modules.

log_replace(log, entries)

@callback log_replace(log :: term(), entries :: [Vsr.LogEntry.t()]) :: new_log :: term()

Replaces the entire VSR log with new entries.

Used during state transfer and view changes. Required callback - must be implemented by all VSR modules.

monitor_node(node_id, message, inner_state)

@callback monitor_node(node_id :: term(), message :: term(), inner_state :: term()) ::
  reference()

Monitors another node in the cluster for failure detection.

The default implementation uses Process.monitor/1 for PID-based node identifiers. Override this for custom monitoring mechanisms.

send_reply(from, reply, inner_state)

@callback send_reply(from :: term(), reply :: term(), inner_state :: term()) :: term()

Sends a reply to a client.

The default implementation uses GenServer.reply/2 for standard Erlang distribution. Override this for custom communication protocols where the from parameter needs to be handled differently (e.g., encoded node references, message passing).

send_vsr(node_id, message, inner_state)

@callback send_vsr(node_id :: term(), message :: term(), inner_state :: term()) :: term()

Sends a VSR message to another node in the cluster.

The default implementation uses PIDs as node identifiers and sends via raw send/2. Override this for custom communication protocols (e.g., Maelstrom, network protocols).

terminate(reason, state)

@callback terminate(reason, state :: term()) :: term()
when reason: :normal | :shutdown | {:shutdown, term()} | term()

Invoked when the server is about to exit. It should do any cleanup required.

See GenServer.terminate/2 for general information about this callback.

This callback is optional.

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

dump(server)

@spec dump(server()) :: state()

Dumps the internal state of a VSR server for testing and debugging.

node_id(server \\ self())

Gets the node_id of a VSR server.

set_cluster(server, node_id, replicas, cluster_size \\ nil)

@spec set_cluster(
  server(),
  node_id,
  replicas :: Enumerable.t(node_id),
  cluster_size :: non_neg_integer() | nil
) :: :ok
when node_id: term()

In the case that the cluster cannot be known at boot time, this function may be used to set cluster details.

set_log(server, log)

@spec set_log(server(), log :: term()) :: :ok

In the case that the log cannot be known at boot time (for example, some parameter in the log setup depends on the cluster configuration), this function may be used to set the log.

set_log_impl(log, state)

start_link(module, opts)

vsr_send(server, message)

@spec vsr_send(server(), message :: term()) :: term()

In certain situations, you may need to send VSR messages out-of-band from the normal erlang distribution mechanism that VsrServer relies on by default. In this case you may use this function to send VSR messages to VSR servers.