View Source cets (cets v0.3.0)
Main CETS module.
CETS stores data in-memory. Writes are replicated on all nodes across the cluster. Reads are done locally.
This module contains functions to write data. To read data, use an ets
module from the Erlang/OTP.
The preferred key format is a node specific key (i.e. a key should contain the inserter node name or a pid). This should simplify the cleaning logic. This also avoids the key conflicts during the cluster join.
A random key is a good option too. But the cleaning logic in the case of a netsplit could be tricky. Also, if you update a record with a random key, you have to provide a handle_conflict
function on startup (because two segments in the cluster could contain a new and an old version of the record, so the records would be overwritten incorrectly).
Be careful, CETS does not protect you from records being overwritten. It is a good option to provide handle_conflict
function as a start argument, so you could choose, which version of the record to keep, if there are two versions present in the different cluster segments.
Often we need to insert some key, if it is not presented yet. Use insert_new
for this, it would use a single node to serialize inserts.
Check MongooseIM code for examples of usage of this module.
Summary
Types
Pid of the helper process that tracks unacked writes.
Delayed operation.
gen_server's caller.
Handler function which is called when we need to choose which record to keep during joining.
Handler function which is called when the remote node goes down.
Handler function which is called when a leader operation is received by a non-leader (for debugging).
An unique ID assigned during the table join attempt.
Types of gen_server calls.
Write operation type.
Reference returned from pause/1
.
Message broadcasted to other nodes.
Request Reference.
Response return from gen_server
's API. Reply
is usually ok.
Timeout to wait for response.
CETS pid.
CETS Process Reference.
Ordered list of server pids.
Options for start/2
function.
ETS table name (and the process server name).
Functions
Removes an object with the key from all nodes in the cluster.
Removes multiple objects using a list of keys.
Async delete_many/2
call.
Removes a specific object. Useful to remove data from ETS tables of type bag
.
Async delete_object/2
call.
Removes multiple specific tuples.
Async delete/2
call.
Gets all records from a local ETS table.
Gets the pid of the process, which handles insert_new
operations.
Async get_nodes/1
call.
Returns debug information from the server.
Inserts (or overwrites) a tuple into a table.
Inserts (or overwrites) several tuples into a table.
Async insert_many/2
call.
Tries to insert a new record.
Inserts a new tuple or returns an existing one.
Async insert/2
call.
Serialized version of insert/2
.
Get a list of other nodes in the cluster that are connected together.
Gets a list of other CETS processes that are handling this table.
Pauses update operations.
Blocks until all pending Erlang messages are processed by the Server
.
Waits till all pending operations are applied.
Gets all records from a remote ETS table.
Sets is_leader
field in the state.
Starts a process serving an ETS table.
Starts a linked process serving an ETS table.
Stops a CETS server.
Returns a table name, that the server is serving.
Unpauses update operations.
Waits for the result of the async operation.
Waits for multiple responses.
Types
-type ack_pid() :: cets_ack:ack_pid().
Pid of the helper process that tracks unacked writes.
Delayed operation.
-type from() :: gen_server:from().
gen_server's caller.
Handler function which is called when we need to choose which record to keep during joining.
-type handle_down_fun() :: fun((#{remote_pid := server_pid(), remote_node := node(), table := table_name(), is_leader := boolean()}) -> ok).
Handler function which is called when the remote node goes down.
-type handle_wrong_leader() :: fun((#{from := from(), op := op(), server := server_pid()}) -> ok).
Handler function which is called when a leader operation is received by a non-leader (for debugging).
-type info() :: #{table := table_name(), nodes := [node()], other_servers := [pid()], size := non_neg_integer(), memory := non_neg_integer(), ack_pid := ack_pid(), join_ref := join_ref(), opts := start_opts(), node_down_history := [node_down_event()], pause_monitors := [pause_monitor()]}.
Status information returned info/1
.
-type join_ref() :: cets_join:join_ref().
An unique ID assigned during the table join attempt.
-type long_msg() :: pause | ping | remote_dump | ping_all | table_name | get_info | other_servers | get_nodes | {unpause, reference()} | get_leader | {set_leader, boolean()} | {send_dump, servers(), join_ref(), pause_monitor(), [tuple()]}.
Types of gen_server calls.
-type op() :: {insert, tuple()} | {delete, term()} | {delete_object, term()} | {insert_many, [tuple()]} | {delete_many, [term()]} | {delete_objects, [term()]} | {insert_new, tuple()} | {insert_new_or_lookup, tuple()} | {leader_op, op()}.
Write operation type.
-type pause_monitor() :: reference().
Reference returned from pause/1
.
-type remote_op() :: {remote_op, Op :: op(), From :: from(), AckPid :: ack_pid(), JoinRef :: join_ref()}.
Message broadcasted to other nodes.
-type request_id() :: gen_server:request_id().
Request Reference.
-type response_return() :: {reply, Reply :: term()} | {error, {_, _}} | timeout.
Response return from gen_server
's API. Reply
is usually ok.
Timeout to wait for response.
-type server_pid() :: pid().
CETS pid.
-type server_ref() :: server_pid() | atom() | {local, atom()} | {global, term()} | {via, module(), term()}.
CETS Process Reference.
-type servers() :: ordsets:ordset(server_pid()).
Ordered list of server pids.
-type start_error() :: bag_with_conflict_handler.
-type start_opts() :: #{type => ordered_set | bag, keypos => non_neg_integer(), handle_down => handle_down_fun(), handle_conflict => handle_conflict_fun(), handle_wrong_leader => handle_wrong_leader()}.
Options for start/2
function.
-type state() :: #{tab := table_name(), keypos := pos_integer(), ack_pid := ack_pid(), join_ref := join_ref(), other_servers := servers(), leader := server_pid(), is_leader := boolean(), opts := start_opts(), backlog := [backlog_entry()], pause_monitors := [pause_monitor()], node_down_history := [node_down_event()]}.
-type table_name() :: atom().
ETS table name (and the process server name).
Functions
-spec delete(server_ref(), term()) -> ok.
Removes an object with the key from all nodes in the cluster.
Ideally, nodes should only remove data that they've inserted, not data from another node.
See also: delete_many/2, delete_request/2.
-spec delete_many(server_ref(), [term()]) -> ok.
Removes multiple objects using a list of keys.
See also: delete/2.
-spec delete_many_request(server_ref(), [term()]) -> request_id().
Async delete_many/2
call.
-spec delete_object(server_ref(), tuple()) -> ok.
Removes a specific object. Useful to remove data from ETS tables of type bag
.
-spec delete_object_request(server_ref(), tuple()) -> request_id().
Async delete_object/2
call.
-spec delete_objects(server_ref(), [tuple()]) -> ok.
Removes multiple specific tuples.
-spec delete_objects_request(server_ref(), [tuple()]) -> request_id().
Async delete_objects/2
call.
-spec delete_request(server_ref(), term()) -> request_id().
Async delete/2
call.
-spec dump(table_name()) -> Records :: [tuple()].
Gets all records from a local ETS table.
-spec get_leader(server_ref()) -> server_pid().
Gets the pid of the process, which handles insert_new
operations.
-spec get_nodes_request(server_ref()) -> request_id().
Async get_nodes/1
call.
-spec info(server_ref()) -> info().
Returns debug information from the server.
-spec insert(server_ref(), tuple()) -> ok.
Inserts (or overwrites) a tuple into a table.
Only the node that owns the data could update/remove the data. Ideally, Key should contain inserter node info so cleaning and merging is simplified.
-spec insert_many(server_ref(), [tuple()]) -> ok.
Inserts (or overwrites) several tuples into a table.
-spec insert_many_request(server_ref(), [tuple()]) -> request_id().
Async insert_many/2
call.
-spec insert_new(server_ref(), tuple()) -> WasInserted :: boolean().
Tries to insert a new record.
All inserts are sent to the leader node first. It is a slightly slower comparing to just insert, because extra messaging is required.
-spec insert_new_or_lookup(server_ref(), tuple()) -> {WasInserted, ReadRecords} when WasInserted :: boolean(), ReadRecords :: [tuple()].
Inserts a new tuple or returns an existing one.
-spec insert_request(server_ref(), tuple()) -> request_id().
Async insert/2
call.
-spec insert_serial(server_ref(), tuple()) -> ok.
Serialized version of insert/2
.
All insert_serial
calls are sent to the leader node first.
Similar to insert_new/2
, but overwrites the data silently on conflict. It could be used to update entries, which use not node-specific keys.
-spec other_nodes(server_ref()) -> ordsets:ordset(node()).
Get a list of other nodes in the cluster that are connected together.
-spec other_pids(server_ref()) -> servers().
Gets a list of other CETS processes that are handling this table.
-spec pause(server_ref()) -> pause_monitor().
Pauses update operations.
cets:insert/2
and other functions would block, till the server is unpaused.
-spec ping(server_ref()) -> pong.
Blocks until all pending Erlang messages are processed by the Server
.
-spec ping_all(server_ref()) -> ok | {error, [{server_pid(), Reason :: term()}]}.
Waits till all pending operations are applied.
-spec remote_dump(server_ref()) -> {ok, Records :: [tuple()]}.
Gets all records from a remote ETS table.
-spec set_leader(server_ref(), boolean()) -> ok.
Sets is_leader
field in the state.
For debugging only. Setting in in the real life would break leader election logic.
-spec start(table_name(), start_opts()) -> gen_server:start_ret().
Starts a process serving an ETS table.
The process would be registered under table_name()
name.
Options:
- handle_down = fun(#{remote_pid := Pid, table := Tab})
Called when a remote node goes down. This function is called on all nodes in the remaining partition, so you should call the remote nodes from this function. Otherwise a circular locking could happen. i.e. any functions that replicate changes are not allowed (i.e. cets:insert/2
, cets:remove/2
and so on). Use ets
module to handle the cleaning (i.e. ets:match_delete/2
). Use spawn to make a new async process if you need to update the data on the remote nodes, but it could cause an improper cleaning due to the race conditions.
- handle_conflict = fun(Record1, Record2) -> NewRecord
Called when two records have the same key when clustering. NewRecord
would be the record CETS would keep in the table under the key. Does not work for bags.
We recommend to define that function if keys could have conflicts. This function would be called once for each conflicting key. We recommend to keep that function pure (or at least no blocking calls from it).
-spec start_link(table_name(), start_opts()) -> gen_server:start_ret().
Starts a linked process serving an ETS table.
See also: start/2.
-spec stop(server_ref()) -> ok.
Stops a CETS server.
-spec table_name(server_ref()) -> {ok, table_name()}.
Returns a table name, that the server is serving.
-spec unpause(server_ref(), pause_monitor()) -> ok | {error, unknown_pause_monitor}.
Unpauses update operations.
Provide reference, returned from cets:pause/1
as an argument.
-spec wait_response(request_id(), timeout()) -> response_return().
Waits for the result of the async operation.
-spec wait_responses([request_id()], response_timeout()) -> [response_return()].
Waits for multiple responses.
Returns results in the same order as ReqIds
. Blocks for maximum Timeout
milliseconds.