View Source cets (cets v0.2.0)

Main CETS module.

CETS stores data in-memory. Writes are replicated on all nodes across the cluster. Reads are done locally.

This module contains functions to write data. To read data, use an ets module from the Erlang/OTP.

The preferred key format is a node specific key (i.e. a key should contain the inserter node name or a pid). This should simplify the cleaning logic. This also avoids the key conflicts during the cluster join.

A random key is a good option too. But the cleaning logic in the case of a netsplit could be tricky. Also, if you update a record with a random key, you have to provide a handle_conflict function on startup (because two segments in the cluster could contain a new and an old version of the record, so the records would be overwritten incorrectly).

Be careful, CETS does not protect you from records being overwritten. It is a good option to provide handle_conflict function as a start argument, so you could choose, which version of the record to keep, if there are two versions present in the different cluster segments.

Often we need to insert some key, if it is not presented yet. Use insert_new for this, it would use a single node to serialize inserts.

Check MongooseIM code for examples of usage of this module.

Link to this section Summary

Types

Pid of the helper process that tracks unacked writes.
Delayed operation.
gen_server's caller.
Handler function which is called when we need to choose which record to keep during joining.
Handler function which is called when the remote node goes down.
Handler function which is called when a leader operation is received by a non-leader (for debugging).
Status information returned info/1.
An unique ID assigned during the table join attempt.
Types of gen_server calls.
Write operation type.
Reference returned from pause/1.
Message broadcasted to other nodes.
Request Reference.
Response return from gen_server's API. Reply is usually ok.
Timeout to wait for response.
CETS pid.
CETS Process Reference.
Ordered list of server pids.
Options for start/2 function.
ETS table name (and the process server name).

Functions

Removes an object with the key from all nodes in the cluster.

Removes multiple objects using a list of keys.

See also: delete/2.

Async delete_many/2 call.
Removes a specific object. Useful to remove data from ETS tables of type bag.
Async delete_object/2 call.
Removes multiple specific tuples.
Async delete_objects/2 call.
Async delete/2 call.
Gets all records from a local ETS table.
Gets the pid of the process, which handles insert_new operations.
Async get_nodes/1 call.
Returns debug information from the server.

Inserts (or overwrites) a tuple into a table.

Inserts (or overwrites) several tuples into a table.
Async insert_many/2 call.

Tries to insert a new record.

Inserts a new tuple or returns an existing one.
Async insert/2 call.

Serialized version of insert/2.

Get a list of other nodes in the cluster that are connected together.
Gets a list of other CETS processes that are handling this table.

Pauses update operations.

Blocks until all pending Erlang messages are processed by the Server.
Waits till all pending operations are applied.
Gets all records from a remote ETS table.

Sets is_leader field in the state.

Starts a process serving an ETS table.

Stops a CETS server.
Returns a table name, that the server is serving.

Unpauses update operations.

Waits for the result of the async operation.

Waits for multiple responses.

Link to this section Types

-type ack_pid() :: cets_ack:ack_pid().
Pid of the helper process that tracks unacked writes.
-type backlog_entry() :: {op(), from()}.
Delayed operation.
-type from() :: gen_server:from().
gen_server's caller.
-type handle_conflict_fun() :: fun((tuple(), tuple()) -> tuple()).
Handler function which is called when we need to choose which record to keep during joining.
-type handle_down_fun() ::
    fun((#{remote_pid := server_pid(),
           remote_node := node(),
           table := table_name(),
           is_leader := boolean()}) ->
            ok).
Handler function which is called when the remote node goes down.
-type handle_wrong_leader() :: fun((#{from := from(), op := op(), server := server_pid()}) -> ok).
Handler function which is called when a leader operation is received by a non-leader (for debugging).
-type info() ::
    #{table := table_name(),
      nodes := [node()],
      other_servers := [pid()],
      size := non_neg_integer(),
      memory := non_neg_integer(),
      ack_pid := ack_pid(),
      join_ref := join_ref(),
      opts := start_opts(),
      node_down_history := [node_down_event()],
      pause_monitors := [pause_monitor()]}.
Status information returned info/1.
-type join_ref() :: cets_join:join_ref().
An unique ID assigned during the table join attempt.
-type long_msg() ::
    pause | ping | remote_dump | ping_all | table_name | get_info | other_servers | get_nodes |
    {unpause, reference()} |
    get_leader |
    {set_leader, boolean()} |
    {send_dump, servers(), join_ref(), pause_monitor(), [tuple()]}.
Types of gen_server calls.
-type node_down_event() :: #{node => node(), pid => pid(), reason => term()}.
-type op() ::
    {insert, tuple()} |
    {delete, term()} |
    {delete_object, term()} |
    {insert_many, [tuple()]} |
    {delete_many, [term()]} |
    {delete_objects, [term()]} |
    {insert_new, tuple()} |
    {insert_new_or_lookup, tuple()} |
    {leader_op, op()}.
Write operation type.
-type pause_monitor() :: reference().
Reference returned from pause/1.
-type remote_op() :: {remote_op, Op :: op(), From :: from(), AckPid :: ack_pid(), JoinRef :: join_ref()}.
Message broadcasted to other nodes.
-type request_id() :: gen_server:request_id().
Request Reference.
-type response_return() :: {reply, Reply :: term()} | {error, {_, _}} | timeout.
Response return from gen_server's API. Reply is usually ok.
-type response_timeout() :: timeout() | {abs, integer()}.
Timeout to wait for response.
-type server_pid() :: pid().
CETS pid.
-type server_ref() ::
    server_pid() | atom() | {local, atom()} | {global, term()} | {via, module(), term()}.
CETS Process Reference.
-type servers() :: ordsets:ordset(server_pid()).
Ordered list of server pids.
-type start_error() :: bag_with_conflict_handler.
-type start_opts() ::
    #{type => ordered_set | bag,
      keypos => non_neg_integer(),
      handle_down => handle_down_fun(),
      handle_conflict => handle_conflict_fun(),
      handle_wrong_leader => handle_wrong_leader()}.
Options for start/2 function.
-type state() ::
    #{tab := table_name(),
      keypos := pos_integer(),
      ack_pid := ack_pid(),
      join_ref := join_ref(),
      other_servers := servers(),
      leader := server_pid(),
      is_leader := boolean(),
      opts := start_opts(),
      backlog := [backlog_entry()],
      pause_monitors := [pause_monitor()],
      node_down_history := [node_down_event()]}.
-type table_name() :: atom().
ETS table name (and the process server name).

Link to this section Functions

-spec delete(server_ref(), term()) -> ok.

Removes an object with the key from all nodes in the cluster.

Ideally, nodes should only remove data that they've inserted, not data from another node.

See also: delete_many/2, delete_request/2.

Link to this function

delete_many(Server, Keys)

View Source
-spec delete_many(server_ref(), [term()]) -> ok.
Removes multiple objects using a list of keys.

See also: delete/2.

Link to this function

delete_many_request(Server, Keys)

View Source
-spec delete_many_request(server_ref(), [term()]) -> request_id().
Async delete_many/2 call.
Link to this function

delete_object(Server, Object)

View Source
-spec delete_object(server_ref(), tuple()) -> ok.
Removes a specific object. Useful to remove data from ETS tables of type bag.
Link to this function

delete_object_request(Server, Object)

View Source
-spec delete_object_request(server_ref(), tuple()) -> request_id().
Async delete_object/2 call.
Link to this function

delete_objects(Server, Objects)

View Source
-spec delete_objects(server_ref(), [tuple()]) -> ok.
Removes multiple specific tuples.
Link to this function

delete_objects_request(Server, Objects)

View Source
-spec delete_objects_request(server_ref(), [tuple()]) -> request_id().
Async delete_objects/2 call.
Link to this function

delete_request(Server, Key)

View Source
-spec delete_request(server_ref(), term()) -> request_id().
Async delete/2 call.
-spec dump(table_name()) -> Records :: [tuple()].
Gets all records from a local ETS table.
-spec get_leader(server_ref()) -> server_pid().
Gets the pid of the process, which handles insert_new operations.
Link to this function

get_nodes_request(Server)

View Source
-spec get_nodes_request(server_ref()) -> request_id().
Async get_nodes/1 call.
-spec info(server_ref()) -> info().
Returns debug information from the server.
-spec insert(server_ref(), tuple()) -> ok.

Inserts (or overwrites) a tuple into a table.

Only the node that owns the data could update/remove the data. Ideally, Key should contain inserter node info so cleaning and merging is simplified.
Link to this function

insert_many(Server, Records)

View Source
-spec insert_many(server_ref(), [tuple()]) -> ok.
Inserts (or overwrites) several tuples into a table.
Link to this function

insert_many_request(Server, Records)

View Source
-spec insert_many_request(server_ref(), [tuple()]) -> request_id().
Async insert_many/2 call.
-spec insert_new(server_ref(), tuple()) -> WasInserted :: boolean().

Tries to insert a new record.

All inserts are sent to the leader node first. It is a slightly slower comparing to just insert, because extra messaging is required.
Link to this function

insert_new_or_lookup(Server, Rec)

View Source
-spec insert_new_or_lookup(server_ref(), tuple()) -> {WasInserted, ReadRecords}
                        when WasInserted :: boolean(), ReadRecords :: [tuple()].
Inserts a new tuple or returns an existing one.
Link to this function

insert_request(Server, Rec)

View Source
-spec insert_request(server_ref(), tuple()) -> request_id().
Async insert/2 call.
Link to this function

insert_serial(Server, Rec)

View Source
-spec insert_serial(server_ref(), tuple()) -> ok.

Serialized version of insert/2.

All insert_serial calls are sent to the leader node first.

Similar to insert_new/2, but overwrites the data silently on conflict. It could be used to update entries, which use not node-specific keys.
-spec other_nodes(server_ref()) -> ordsets:ordset(node()).
Get a list of other nodes in the cluster that are connected together.
-spec other_pids(server_ref()) -> servers().
Gets a list of other CETS processes that are handling this table.
-spec pause(server_ref()) -> pause_monitor().

Pauses update operations.

cets:insert/2 and other functions would block, till the server is unpaused.
-spec ping(server_ref()) -> pong.
Blocks until all pending Erlang messages are processed by the Server.
-spec ping_all(server_ref()) -> ok | {error, [{server_pid(), Reason :: term()}]}.
Waits till all pending operations are applied.
-spec remote_dump(server_ref()) -> {ok, Records :: [tuple()]}.
Gets all records from a remote ETS table.
Link to this function

set_leader(Server, IsLeader)

View Source
-spec set_leader(server_ref(), boolean()) -> ok.

Sets is_leader field in the state.

For debugging only. Setting in in the real life would break leader election logic.
-spec start(table_name(), start_opts()) -> gen_server:start_ret().

Starts a process serving an ETS table.

The process would be registered under table_name() name.

Options:

- handle_down = fun(#{remote_pid := Pid, table := Tab})

Called when a remote node goes down. This function is called on all nodes in the remaining partition, so you should call the remote nodes from this function. Otherwise a circular locking could happen. i.e. any functions that replicate changes are not allowed (i.e. cets:insert/2, cets:remove/2 and so on). Use ets module to handle the cleaning (i.e. ets:match_delete/2). Use spawn to make a new async process if you need to update the data on the remote nodes, but it could cause an improper cleaning due to the race conditions.

- handle_conflict = fun(Record1, Record2) -> NewRecord

Called when two records have the same key when clustering. NewRecord would be the record CETS would keep in the table under the key. Does not work for bags.

We recommend to define that function if keys could have conflicts. This function would be called once for each conflicting key. We recommend to keep that function pure (or at least no blocking calls from it).
-spec stop(server_ref()) -> ok.
Stops a CETS server.
-spec table_name(server_ref()) -> {ok, table_name()}.
Returns a table name, that the server is serving.
Link to this function

unpause(Server, PauseRef)

View Source
-spec unpause(server_ref(), pause_monitor()) -> ok | {error, unknown_pause_monitor}.

Unpauses update operations.

Provide reference, returned from cets:pause/1 as an argument.
Link to this function

wait_response(ReqId, Timeout)

View Source
-spec wait_response(request_id(), timeout()) -> response_return().
Waits for the result of the async operation.
Link to this function

wait_responses(ReqIds, Timeout)

View Source
-spec wait_responses([request_id()], response_timeout()) -> [response_return()].

Waits for multiple responses.

Returns results in the same order as ReqIds. Blocks for maximum Timeout milliseconds.