Grove.Replication.Server (Grove v0.1.1)

View Source

A GenServer that manages a single CRDT instance with automatic replication.

The server periodically synchronizes its state with peers in the same replication group using delta-state synchronization.

Usage

# Start a replicated counter
{:ok, pid} = Grove.Replication.Server.start_link(
  crdt_module: Grove.Counter.GCounter,
  actor: :node_a,
  group: {:counter, "page_views"}
)

# Apply a mutation
:ok = Grove.Replication.Server.mutate(pid, &GCounter.increment(&1, 5))

# Get the current value
{:ok, value} = Grove.Replication.Server.value(pid)

Options

  • :crdt_module - Required. The CRDT module (e.g., Grove.Counter.GCounter)
  • :actor - Required. Unique identifier for this replica
  • :group - Required. The replication group name (e.g., {:counter, "page_views"})
  • :sync_interval - Sync interval in ms (default: 5000)
  • :name - Optional GenServer name
  • :storage - Optional storage backend module (e.g., Grove.Storage.ETS)
  • :storage_ref - Optional storage reference (from Storage.init/1)
  • :snapshot_interval - How often to save to storage in ms (default: 60000)
  • :pubsub - Optional Phoenix.PubSub name for broadcasting (alternative to :pg)
  • :pubsub_topic - Optional custom topic (default: derived from group)

Broadcast Mechanisms

By default, the server uses :pg (process groups) via Grove.Cluster.Membership for broadcasting deltas to peers. Alternatively, you can use Phoenix.PubSub:

# Using Phoenix.PubSub instead of :pg
Grove.Replication.Server.start_link(
  crdt_module: Grove.Counter.GCounter,
  actor: :node_a,
  group: {:counter, "page_views"},
  pubsub: MyApp.PubSub
)

When :pubsub is provided, the server subscribes to a topic derived from the group name (e.g., "grove:counter:page_views") and broadcasts deltas via PubSub instead of :pg.

Replication Protocol

  1. On mutation: delta is accumulated in the CRDT
  2. On sync timer: broadcast delta to all peers, reset delta
  3. On receiving delta: merge with local state

Summary

Functions

Applies a batch of mutations atomically.

Returns a specification to start this module under a supervisor.

Applies a mutation function to the CRDT state.

Starts a replication server for a CRDT instance.

Returns the full CRDT state (for debugging or advanced use).

Stops the replication server.

Forces an immediate sync with peers.

Returns the current value of the CRDT.

Types

t()

@type t() :: %Grove.Replication.Server{
  actor: term(),
  crdt_module: module(),
  crdt_state: term(),
  group: term(),
  pubsub: atom() | nil,
  pubsub_topic: String.t() | nil,
  snapshot_interval: pos_integer() | nil,
  snapshot_timer_ref: reference() | nil,
  storage: module() | nil,
  storage_ref: term() | nil,
  sync_interval: pos_integer(),
  sync_timer_ref: reference() | nil
}

Functions

batch(server, fun)

@spec batch(GenServer.server(), (term() -> term())) :: :ok | {:error, term()}

Applies a batch of mutations atomically.

Like mutate/2, but wraps in Grove.batch/2 for rollback on error. If any operation in the batch fails, the CRDT state remains unchanged.

Example

:ok = Server.batch(pid, fn counter ->
  counter
  |> GCounter.increment(5)
  |> GCounter.increment(3)
end)

{:error, reason} = Server.batch(pid, fn counter ->
  raise "oops"
end)

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

mutate(server, fun)

@spec mutate(GenServer.server(), (term() -> term())) :: :ok

Applies a mutation function to the CRDT state.

The function receives the current CRDT state and should return the new state. The delta will be automatically accumulated.

Example

:ok = Server.mutate(pid, fn counter ->
  GCounter.increment(counter, 5)
end)

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Starts a replication server for a CRDT instance.

Options

  • :crdt_module - Required. The CRDT module
  • :actor - Required. Unique actor ID for this replica
  • :group - Required. Replication group name
  • :sync_interval - Sync interval in ms (default: 5000)
  • :name - Optional GenServer name
  • :storage - Optional storage backend module
  • :storage_ref - Storage reference from Storage.init/1
  • :snapshot_interval - Snapshot interval in ms (default: 60000)

state(server)

@spec state(GenServer.server()) :: {:ok, term()}

Returns the full CRDT state (for debugging or advanced use).

stop(server)

@spec stop(GenServer.server()) :: :ok

Stops the replication server.

sync_now(server)

@spec sync_now(GenServer.server()) :: :ok

Forces an immediate sync with peers.

Normally syncing happens on a timer, but this can be used to trigger immediate synchronization.

value(server)

@spec value(GenServer.server()) :: {:ok, term()}

Returns the current value of the CRDT.

This extracts the user-facing value using Grove.Viewable.value/1.