View Source cets (cets v0.2.0)
Main CETS module.
CETS stores data in-memory. Writes are replicated on all nodes across the cluster. Reads are done locally.
This module contains functions to write data. To read data, use an ets
module from the Erlang/OTP.
The preferred key format is a node specific key (i.e. a key should contain the inserter node name or a pid). This should simplify the cleaning logic. This also avoids the key conflicts during the cluster join.
A random key is a good option too. But the cleaning logic in the case of a netsplit could be tricky. Also, if you update a record with a random key, you have to provide a handle_conflict
function on startup (because two segments in the cluster could contain a new and an old version of the record, so the records would be overwritten incorrectly).
Be careful, CETS does not protect you from records being overwritten. It is a good option to provide handle_conflict
function as a start argument, so you could choose, which version of the record to keep, if there are two versions present in the different cluster segments.
Often we need to insert some key, if it is not presented yet. Use insert_new
for this, it would use a single node to serialize inserts.
Link to this section Summary
Types
info/1
.pause/1
.gen_server
's API. Reply
is usually ok.start/2
function.Functions
Removes an object with the key from all nodes in the cluster.
See also: delete/2.
delete_many/2
call.bag
.delete_object/2
call.delete_objects/2
call.delete/2
call.insert_new
operations.get_nodes/1
call.Inserts (or overwrites) a tuple into a table.
insert_many/2
call.Tries to insert a new record.
insert/2
call.Serialized version of insert/2
.
Pauses update operations.
Server
.Sets is_leader
field in the state.
Starts a process serving an ETS table.
Unpauses update operations.
Waits for multiple responses.
Link to this section Types
-type ack_pid() :: cets_ack:ack_pid().
-type from() :: gen_server:from().
-type handle_conflict_fun() :: fun((tuple(), tuple()) -> tuple()).
-type handle_down_fun() :: fun((#{remote_pid := server_pid(), remote_node := node(), table := table_name(), is_leader := boolean()}) -> ok).
-type handle_wrong_leader() :: fun((#{from := from(), op := op(), server := server_pid()}) -> ok).
-type info() :: #{table := table_name(), nodes := [node()], other_servers := [pid()], size := non_neg_integer(), memory := non_neg_integer(), ack_pid := ack_pid(), join_ref := join_ref(), opts := start_opts(), node_down_history := [node_down_event()], pause_monitors := [pause_monitor()]}.
info/1
.
-type join_ref() :: cets_join:join_ref().
-type long_msg() :: pause | ping | remote_dump | ping_all | table_name | get_info | other_servers | get_nodes | {unpause, reference()} | get_leader | {set_leader, boolean()} | {send_dump, servers(), join_ref(), pause_monitor(), [tuple()]}.
-type node_down_event() :: #{node => node(), pid => pid(), reason => term()}.
-type op() :: {insert, tuple()} | {delete, term()} | {delete_object, term()} | {insert_many, [tuple()]} | {delete_many, [term()]} | {delete_objects, [term()]} | {insert_new, tuple()} | {insert_new_or_lookup, tuple()} | {leader_op, op()}.
-type pause_monitor() :: reference().
pause/1
.
-type request_id() :: gen_server:request_id().
-type response_return() :: {reply, Reply :: term()} | {error, {_, _}} | timeout.
gen_server
's API. Reply
is usually ok.
-type response_timeout() :: timeout() | {abs, integer()}.
-type server_pid() :: pid().
-type server_ref() :: server_pid() | atom() | {local, atom()} | {global, term()} | {via, module(), term()}.
-type servers() :: ordsets:ordset(server_pid()).
-type start_error() :: bag_with_conflict_handler.
-type start_opts() :: #{type => ordered_set | bag, keypos => non_neg_integer(), handle_down => handle_down_fun(), handle_conflict => handle_conflict_fun(), handle_wrong_leader => handle_wrong_leader()}.
start/2
function.
-type state() :: #{tab := table_name(), keypos := pos_integer(), ack_pid := ack_pid(), join_ref := join_ref(), other_servers := servers(), leader := server_pid(), is_leader := boolean(), opts := start_opts(), backlog := [backlog_entry()], pause_monitors := [pause_monitor()], node_down_history := [node_down_event()]}.
-type table_name() :: atom().
Link to this section Functions
-spec delete(server_ref(), term()) -> ok.
Removes an object with the key from all nodes in the cluster.
Ideally, nodes should only remove data that they've inserted, not data from another node.See also: delete_many/2, delete_request/2.
-spec delete_many(server_ref(), [term()]) -> ok.
See also: delete/2.
-spec delete_many_request(server_ref(), [term()]) -> request_id().
delete_many/2
call.
-spec delete_object(server_ref(), tuple()) -> ok.
bag
.
-spec delete_object_request(server_ref(), tuple()) -> request_id().
delete_object/2
call.
-spec delete_objects(server_ref(), [tuple()]) -> ok.
-spec delete_objects_request(server_ref(), [tuple()]) -> request_id().
delete_objects/2
call.
-spec delete_request(server_ref(), term()) -> request_id().
delete/2
call.
-spec dump(table_name()) -> Records :: [tuple()].
-spec get_leader(server_ref()) -> server_pid().
insert_new
operations.
-spec get_nodes_request(server_ref()) -> request_id().
get_nodes/1
call.
-spec info(server_ref()) -> info().
-spec insert(server_ref(), tuple()) -> ok.
Inserts (or overwrites) a tuple into a table.
Only the node that owns the data could update/remove the data. Ideally, Key should contain inserter node info so cleaning and merging is simplified.-spec insert_many(server_ref(), [tuple()]) -> ok.
-spec insert_many_request(server_ref(), [tuple()]) -> request_id().
insert_many/2
call.
-spec insert_new(server_ref(), tuple()) -> WasInserted :: boolean().
Tries to insert a new record.
All inserts are sent to the leader node first. It is a slightly slower comparing to just insert, because extra messaging is required.-spec insert_new_or_lookup(server_ref(), tuple()) -> {WasInserted, ReadRecords} when WasInserted :: boolean(), ReadRecords :: [tuple()].
-spec insert_request(server_ref(), tuple()) -> request_id().
insert/2
call.
-spec insert_serial(server_ref(), tuple()) -> ok.
Serialized version of insert/2
.
All insert_serial
calls are sent to the leader node first.
insert_new/2
, but overwrites the data silently on conflict. It could be used to update entries, which use not node-specific keys.
-spec other_nodes(server_ref()) -> ordsets:ordset(node()).
-spec other_pids(server_ref()) -> servers().
-spec pause(server_ref()) -> pause_monitor().
Pauses update operations.
cets:insert/2
and other functions would block, till the server is unpaused.
-spec ping(server_ref()) -> pong.
Server
.
-spec ping_all(server_ref()) -> ok | {error, [{server_pid(), Reason :: term()}]}.
-spec remote_dump(server_ref()) -> {ok, Records :: [tuple()]}.
-spec set_leader(server_ref(), boolean()) -> ok.
Sets is_leader
field in the state.
-spec start(table_name(), start_opts()) -> gen_server:start_ret().
Starts a process serving an ETS table.
The process would be registered under table_name()
name.
Options:
- handle_down = fun(#{remote_pid := Pid, table := Tab})
Called when a remote node goes down. This function is called on all nodes in the remaining partition, so you should call the remote nodes from this function. Otherwise a circular locking could happen. i.e. any functions that replicate changes are not allowed (i.e. cets:insert/2
, cets:remove/2
and so on). Use ets
module to handle the cleaning (i.e. ets:match_delete/2
). Use spawn to make a new async process if you need to update the data on the remote nodes, but it could cause an improper cleaning due to the race conditions.
- handle_conflict = fun(Record1, Record2) -> NewRecord
Called when two records have the same key when clustering. NewRecord
would be the record CETS would keep in the table under the key. Does not work for bags.
-spec stop(server_ref()) -> ok.
-spec table_name(server_ref()) -> {ok, table_name()}.
-spec unpause(server_ref(), pause_monitor()) -> ok | {error, unknown_pause_monitor}.
Unpauses update operations.
Provide reference, returned fromcets:pause/1
as an argument.
-spec wait_response(request_id(), timeout()) -> response_return().
-spec wait_responses([request_id()], response_timeout()) -> [response_return()].
Waits for multiple responses.
Returns results in the same order asReqIds
. Blocks for maximum Timeout
milliseconds.