DeltaCrdt (delta_crdt v0.6.4) View Source

Start and interact with the Delta CRDTs provided by this library.

A CRDT is a conflict-free replicated data-type. That is to say, it is a distributed data structure that automatically resolves conflicts in a way that is consistent across all replicas of the data. In other words, your distributed data is guaranteed to eventually converge globally.

Normal CRDTs (otherwise called "state CRDTs") require transmission of the entire CRDT state with every change. This clearly doesn't scale, but there has been exciting research in the last few years into "Delta CRDTs", CRDTs that only transmit their deltas. This has enabled a whole new scale of applications for CRDTs, and it's also what this library is based on.

A Delta CRDT is made of two parts. First, the data structure itself, and second, an anti-entropy algorithm, which is responsible for ensuring convergence. DeltaCrdt implements Algorithm 2 from "Delta State Replicated Data Types – Almeida et al. 2016" which is an anti-entropy algorithm for δ-CRDTs. DeltaCrdt also implements join decomposition to ensure that deltas aren't transmitted unnecessarily in the cluster.

While it is certainly interesting to have a look at this paper and spend time grokking it, in theory I've done the hard work so that you don't have to, and this library is the result.

With this library, you can build distributed applications that share some state. Horde.Supervisor and Horde.Registry are both built atop DeltaCrdt, but there are certainly many more possibilities.

Here's a simple example for illustration:

iex> {:ok, crdt1} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: 3)
iex> {:ok, crdt2} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: 3)
iex> DeltaCrdt.set_neighbours(crdt1, [crdt2])
iex> DeltaCrdt.set_neighbours(crdt2, [crdt1])
iex> DeltaCrdt.to_map(crdt1)
%{}
iex> DeltaCrdt.put(crdt1, "CRDT", "is magic!")
iex> Process.sleep(10) # need to wait for propagation for the doctest
iex> DeltaCrdt.to_map(crdt2)
%{"CRDT" => "is magic!"}

Link to this section Summary

Functions

Include DeltaCrdt in a supervision tree with {DeltaCrdt, [crdt: DeltaCrdt.AWLWWMap, name: MyCRDTMap]}

Mutate the CRDT synchronously.

Mutate the CRDT asynchronously.

read(crdt) deprecated

Read the state of the CRDT.

Notify a CRDT of its neighbours.

Start a DeltaCrdt and link it to the calling process.

Link to this section Types

Specs

crdt_option() ::
  {:on_diffs, ([diff()] -> any()) | {module(), function(), [any()]}}
  | {:sync_interval, pos_integer()}
  | {:max_sync_size, pos_integer() | :infinite}
  | {:storage_module, DeltaCrdt.Storage.t()}

Specs

crdt_options() :: [crdt_option()]

Specs

diff() :: {:add, key :: any(), value :: any()} | {:remove, key :: any()}

Specs

key() :: any()

Specs

t() :: GenServer.server()

Specs

value() :: any()

Link to this section Functions

Include DeltaCrdt in a supervision tree with {DeltaCrdt, [crdt: DeltaCrdt.AWLWWMap, name: MyCRDTMap]}

Link to this function

delete(crdt, key, timeout \\ 5000)

View Source

Specs

delete(t(), key(), timeout()) :: t()
Link to this function

drop(crdt, keys, timeout \\ 5000)

View Source

Specs

drop(t(), [key()], timeout()) :: t()
Link to this function

get(crdt, key, timeout \\ 5000)

View Source

Specs

get(t(), key(), timeout()) :: value()
Link to this function

merge(crdt, map, timeout \\ 5000)

View Source

Specs

merge(t(), map(), timeout()) :: t()
Link to this function

mutate(crdt, f, a, timeout \\ 5000)

View Source
This function is deprecated. Use put/4 instead.

Specs

mutate(
  crdt :: t(),
  function :: atom(),
  arguments :: list(),
  timeout :: timeout()
) :: :ok

Mutate the CRDT synchronously.

For the asynchronous version of this function, see mutate_async/3.

To see which operations are available, see the documentation for the crdt module that was provided in start_link/3.

For example, DeltaCrdt.AWLWWMap has a function add that takes 4 arguments. The last 2 arguments are supplied by DeltaCrdt internally, so you have to provide only the first two arguments: key and val. That would look like this: DeltaCrdt.mutate(crdt, :add, ["CRDT", "is magic!"]). This pattern is repeated for all mutation functions. Another example: to call DeltaCrdt.AWLWWMap.clear, use DeltaCrdt.mutate(crdt, :clear, []).

Link to this function

mutate_async(crdt, f, a)

View Source
This function is deprecated. Will be removed without replacement in a future version.

Specs

mutate_async(crdt :: t(), function :: atom(), arguments :: list()) :: :ok

Mutate the CRDT asynchronously.

Link to this function

put(crdt, key, value, timeout \\ 5000)

View Source

Specs

put(t(), key(), value(), timeout()) :: t()
This function is deprecated. Use get/2 or take/3 or to_map/2.

Specs

read(crdt :: t()) :: crdt_state :: term()

Read the state of the CRDT.

Forwards arguments to the used crdt module, so read(crdt, ["my-key"]) would call crdt_module.read(state, ["my-key"]).

For example, DeltaCrdt.AWLWWMap accepts a list of keys to limit the returned values instead of returning everything.

Specs

read(crdt :: t(), timeout :: timeout()) :: crdt_state :: term()
read(crdt :: t(), keys :: list()) :: crdt_state :: term()
Link to this function

read(crdt, keys, timeout)

View Source

Specs

read(crdt :: t(), keys :: list(), timeout :: timeout()) :: crdt_state :: term()
Link to this function

set_neighbours(crdt, neighbours)

View Source

Specs

set_neighbours(crdt :: t(), neighbours :: [t()]) :: :ok

Notify a CRDT of its neighbours.

This function allows CRDTs to communicate with each other and sync their states.

Note: this sets up a unidirectional sync, so if you want bidirectional syncing (which is normally desirable), then you must call this function twice (or thrice for 3 nodes, etc):

DeltaCrdt.set_neighbours(c1, [c2, c3])
DeltaCrdt.set_neighbours(c2, [c1, c3])
DeltaCrdt.set_neighbours(c3, [c1, c2])
Link to this function

start_link(crdt_module, opts \\ [])

View Source

Specs

start_link(crdt_module :: module(), opts :: crdt_options()) ::
  GenServer.on_start()

Start a DeltaCrdt and link it to the calling process.

There are a number of options you can specify to tweak the behaviour of DeltaCrdt:

  • :sync_interval - the delta CRDT will attempt to sync its local changes with its neighbours at this interval (specified in milliseconds). Default is 200.
  • :on_diffs - function which will be invoked on every diff
  • :max_sync_size - maximum size of synchronization (specified in number of items to sync)
  • :storage_module - module which implements DeltaCrdt.Storage behaviour
Link to this function

take(crdt, keys, timeout \\ 5000)

View Source

Specs

take(t(), [key()], timeout()) :: [{key(), value()}]
Link to this function

to_map(crdt, timeout \\ 5000)

View Source

Specs

to_map(t(), timeout()) :: map()