Grove.Replication.Server (Grove v0.1.1)
View SourceA GenServer that manages a single CRDT instance with automatic replication.
The server periodically synchronizes its state with peers in the same replication group using delta-state synchronization.
Usage
# Start a replicated counter
{:ok, pid} = Grove.Replication.Server.start_link(
crdt_module: Grove.Counter.GCounter,
actor: :node_a,
group: {:counter, "page_views"}
)
# Apply a mutation
:ok = Grove.Replication.Server.mutate(pid, &GCounter.increment(&1, 5))
# Get the current value
{:ok, value} = Grove.Replication.Server.value(pid)Options
:crdt_module- Required. The CRDT module (e.g.,Grove.Counter.GCounter):actor- Required. Unique identifier for this replica:group- Required. The replication group name (e.g.,{:counter, "page_views"}):sync_interval- Sync interval in ms (default: 5000):name- Optional GenServer name:storage- Optional storage backend module (e.g.,Grove.Storage.ETS):storage_ref- Optional storage reference (fromStorage.init/1):snapshot_interval- How often to save to storage in ms (default: 60000):pubsub- Optional Phoenix.PubSub name for broadcasting (alternative to :pg):pubsub_topic- Optional custom topic (default: derived from group)
Broadcast Mechanisms
By default, the server uses :pg (process groups) via Grove.Cluster.Membership
for broadcasting deltas to peers. Alternatively, you can use Phoenix.PubSub:
# Using Phoenix.PubSub instead of :pg
Grove.Replication.Server.start_link(
crdt_module: Grove.Counter.GCounter,
actor: :node_a,
group: {:counter, "page_views"},
pubsub: MyApp.PubSub
)When :pubsub is provided, the server subscribes to a topic derived from
the group name (e.g., "grove:counter:page_views") and broadcasts deltas
via PubSub instead of :pg.
Replication Protocol
- On mutation: delta is accumulated in the CRDT
- On sync timer: broadcast delta to all peers, reset delta
- On receiving delta: merge with local state
Summary
Functions
Applies a batch of mutations atomically.
Returns a specification to start this module under a supervisor.
Applies a mutation function to the CRDT state.
Starts a replication server for a CRDT instance.
Returns the full CRDT state (for debugging or advanced use).
Stops the replication server.
Forces an immediate sync with peers.
Returns the current value of the CRDT.
Types
@type t() :: %Grove.Replication.Server{ actor: term(), crdt_module: module(), crdt_state: term(), group: term(), pubsub: atom() | nil, pubsub_topic: String.t() | nil, snapshot_interval: pos_integer() | nil, snapshot_timer_ref: reference() | nil, storage: module() | nil, storage_ref: term() | nil, sync_interval: pos_integer(), sync_timer_ref: reference() | nil }
Functions
@spec batch(GenServer.server(), (term() -> term())) :: :ok | {:error, term()}
Applies a batch of mutations atomically.
Like mutate/2, but wraps in Grove.batch/2 for rollback on error.
If any operation in the batch fails, the CRDT state remains unchanged.
Example
:ok = Server.batch(pid, fn counter ->
counter
|> GCounter.increment(5)
|> GCounter.increment(3)
end)
{:error, reason} = Server.batch(pid, fn counter ->
raise "oops"
end)
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec mutate(GenServer.server(), (term() -> term())) :: :ok
Applies a mutation function to the CRDT state.
The function receives the current CRDT state and should return the new state. The delta will be automatically accumulated.
Example
:ok = Server.mutate(pid, fn counter ->
GCounter.increment(counter, 5)
end)
@spec start_link(keyword()) :: GenServer.on_start()
Starts a replication server for a CRDT instance.
Options
:crdt_module- Required. The CRDT module:actor- Required. Unique actor ID for this replica:group- Required. Replication group name:sync_interval- Sync interval in ms (default: 5000):name- Optional GenServer name:storage- Optional storage backend module:storage_ref- Storage reference fromStorage.init/1:snapshot_interval- Snapshot interval in ms (default: 60000)
@spec state(GenServer.server()) :: {:ok, term()}
Returns the full CRDT state (for debugging or advanced use).
@spec stop(GenServer.server()) :: :ok
Stops the replication server.
@spec sync_now(GenServer.server()) :: :ok
Forces an immediate sync with peers.
Normally syncing happens on a timer, but this can be used to trigger immediate synchronization.
@spec value(GenServer.server()) :: {:ok, term()}
Returns the current value of the CRDT.
This extracts the user-facing value using Grove.Viewable.value/1.