Grove.Sequence.RGA (Grove v0.1.1)

View Source

A Replicated Growable Array (RGA) CRDT for ordered sequences.

RGA provides a conflict-free ordered list that supports concurrent insertions and deletions. Each element has a unique position identifier, and elements are ordered by their causal relationships and timestamps.

Semantics

  • Concurrent inserts: Multiple replicas can insert at the same position; ordering is determined by HLC timestamp (earlier wins), then replica ID
  • Tombstones: Deleted elements are tombstoned, not removed, to maintain ordering anchors for concurrent operations
  • Convergence: All replicas converge to the same ordered sequence

Delta-State Support

This CRDT supports delta-state replication:

  • delta/1 - Returns accumulated changes since last reset
  • reset_delta/1 - Clears the delta buffer after synchronization

Example

iex> rga = Grove.Sequence.RGA.new("replica_1")
iex> {rga, id1} = Grove.Sequence.RGA.insert(rga, "a", :head)
iex> {rga, id2} = Grove.Sequence.RGA.insert(rga, "b", id1)
iex> {rga, _id3} = Grove.Sequence.RGA.insert(rga, "c", id2)
iex> Grove.Sequence.RGA.to_list(rga)
["a", "b", "c"]

References

Based on "Replicated Abstract Data Types: Building Blocks for Collaborative Applications" by Roh, Jeon, Kim, and Lee (2011).

Summary

Functions

Returns the accumulated delta since the last reset.

Inserts a value after the given position.

Merges two RGAs.

Creates a new RGA for the given replica.

Removes an element by its position ID.

Resets the delta buffer after synchronization.

Returns the number of live (non-tombstoned) elements.

Returns the ordered list of values, excluding tombstoned elements.

Returns the ordered list of values (alias for to_list/1).

Types

element()

@type element() :: %{
  id: position_id(),
  value: term(),
  prev_id: position_id() | :head,
  timestamp: Grove.HybridLogicalClock.t()
}

position_id()

@type position_id() :: {replica_id(), non_neg_integer()}

replica_id()

@type replica_id() :: String.t()

t()

@type t() :: %Grove.Sequence.RGA{
  clock: non_neg_integer(),
  delta_buffer: t() | nil,
  elements: %{required(position_id()) => element()},
  replica_id: replica_id(),
  tombstones: MapSet.t(position_id())
}

Functions

delta(rga)

@spec delta(t()) :: t()

Returns the accumulated delta since the last reset.

insert(rga, value, prev_id)

@spec insert(t(), term(), position_id() | :head) :: {t(), position_id()}

Inserts a value after the given position.

Use :head to insert at the beginning of the sequence. Returns {updated_rga, position_id} where position_id can be used for subsequent insertions.

Examples

{rga, id} = RGA.insert(rga, "first", :head)
{rga, _} = RGA.insert(rga, "second", id)

merge(rga1, rga2)

@spec merge(t(), t()) :: t()

Merges two RGAs.

Takes the union of all elements and tombstones, then rebuilds the order.

new(replica_id)

@spec new(replica_id()) :: t()

Creates a new RGA for the given replica.

remove(rga, position_id)

@spec remove(t(), position_id()) :: t()

Removes an element by its position ID.

The element is tombstoned rather than deleted to maintain ordering anchors for concurrent operations.

Returns the updated RGA. If the position doesn't exist, returns unchanged.

reset_delta(rga)

@spec reset_delta(t()) :: t()

Resets the delta buffer after synchronization.

size(rga)

@spec size(t()) :: non_neg_integer()

Returns the number of live (non-tombstoned) elements.

to_list(rga)

@spec to_list(t()) :: [term()]

Returns the ordered list of values, excluding tombstoned elements.

value(rga)

@spec value(t()) :: [term()]

Returns the ordered list of values (alias for to_list/1).