Grove.Sequence.RGA (Grove v0.1.1)
View SourceA Replicated Growable Array (RGA) CRDT for ordered sequences.
RGA provides a conflict-free ordered list that supports concurrent insertions and deletions. Each element has a unique position identifier, and elements are ordered by their causal relationships and timestamps.
Semantics
- Concurrent inserts: Multiple replicas can insert at the same position; ordering is determined by HLC timestamp (earlier wins), then replica ID
- Tombstones: Deleted elements are tombstoned, not removed, to maintain ordering anchors for concurrent operations
- Convergence: All replicas converge to the same ordered sequence
Delta-State Support
This CRDT supports delta-state replication:
delta/1- Returns accumulated changes since last resetreset_delta/1- Clears the delta buffer after synchronization
Example
iex> rga = Grove.Sequence.RGA.new("replica_1")
iex> {rga, id1} = Grove.Sequence.RGA.insert(rga, "a", :head)
iex> {rga, id2} = Grove.Sequence.RGA.insert(rga, "b", id1)
iex> {rga, _id3} = Grove.Sequence.RGA.insert(rga, "c", id2)
iex> Grove.Sequence.RGA.to_list(rga)
["a", "b", "c"]References
Based on "Replicated Abstract Data Types: Building Blocks for Collaborative Applications" by Roh, Jeon, Kim, and Lee (2011).
Summary
Functions
Returns the accumulated delta since the last reset.
Inserts a value after the given position.
Merges two RGAs.
Creates a new RGA for the given replica.
Removes an element by its position ID.
Resets the delta buffer after synchronization.
Returns the number of live (non-tombstoned) elements.
Returns the ordered list of values, excluding tombstoned elements.
Returns the ordered list of values (alias for to_list/1).
Types
@type element() :: %{ id: position_id(), value: term(), prev_id: position_id() | :head, timestamp: Grove.HybridLogicalClock.t() }
@type position_id() :: {replica_id(), non_neg_integer()}
@type replica_id() :: String.t()
@type t() :: %Grove.Sequence.RGA{ clock: non_neg_integer(), delta_buffer: t() | nil, elements: %{required(position_id()) => element()}, replica_id: replica_id(), tombstones: MapSet.t(position_id()) }
Functions
Returns the accumulated delta since the last reset.
@spec insert(t(), term(), position_id() | :head) :: {t(), position_id()}
Inserts a value after the given position.
Use :head to insert at the beginning of the sequence.
Returns {updated_rga, position_id} where position_id can be used
for subsequent insertions.
Examples
{rga, id} = RGA.insert(rga, "first", :head)
{rga, _} = RGA.insert(rga, "second", id)
Merges two RGAs.
Takes the union of all elements and tombstones, then rebuilds the order.
@spec new(replica_id()) :: t()
Creates a new RGA for the given replica.
@spec remove(t(), position_id()) :: t()
Removes an element by its position ID.
The element is tombstoned rather than deleted to maintain ordering anchors for concurrent operations.
Returns the updated RGA. If the position doesn't exist, returns unchanged.
Resets the delta buffer after synchronization.
@spec size(t()) :: non_neg_integer()
Returns the number of live (non-tombstoned) elements.
Returns the ordered list of values, excluding tombstoned elements.
Returns the ordered list of values (alias for to_list/1).