dgen_server behaviour (DGen v0.2.0)

Copy Markdown View Source

A durable, distributed gen_server backed by a pluggable storage backend.

A dgen_server is an abstract entity composed of durable state and operations on that state. The state lives in the configured backend (default: FoundationDB) and the operations are defined by a callback module implementing the dgen_server behaviour. This allows a gen_server to outlive any single Erlang process, node, or cluster.

Zero or more Erlang processes may act on a dgen_server at any time. Processes with consume enabled consume messages from the durable queue and invoke callbacks; processes without it only publish messages.

Options

The following options may be passed via the Opts proplist:

  • tenant (required) - {DbHandle, Dir} pair identifying the backend subspace.
  • consume (default true) - whether this process consumes messages from the queue.
  • reset (default false) - when true, re-initialise the durable state even if it already exists.
  • dead_letter_threshold (default infinity) - number of consecutive processing failures before a message is treated as a dead letter. When a message has failed this many times it is moved to the dead-letter queue, the caller raises {dead_letter, N} (for call messages), and the optional handle_dead_letter/2 callback is invoked. infinity (the default) disables dead-lettering entirely.

Callbacks

  • init/1 - return {ok, State} or {ok, Tuid, State}.
  • handle_call/3 - return {reply, Reply, State} or {reply, Reply, State, Actions}.
  • handle_cast/2 - return {noreply, State} or {noreply, State, Actions}.
  • handle_info/2 - return {noreply, State} or {noreply, State, Actions}.
  • handle_dead_letter/2 (optional) - called after a message is dead-lettered. Receives (Msg, AttemptCount). Return value is ignored. Useful for custom alerting or metrics.

Actions is a list of 1-arity funs executed after the transaction commits. The argument is the

Summary

Functions

Sends a synchronous call request via the durable queue. Default timeout 5000ms.

Sends a synchronous call request via the durable queue.

Sends an asynchronous cast request to the dgen_server's durable queue.

Sends a batch of cast requests to the dgen_server's durable queue atomically.

Kills the dgen_server, deleting all durable state, queue items, and waiting call keys. The process exits with Reason.

Returns a closure for atomically casting a message from within the caller's own backend transaction.

Sends a call that bypasses the durable queue and is handled immediately.

Like priority_call/2 but with an explicit timeout.

Sends a cast that bypasses the durable queue and is handled immediately.

Starts a dgen_server process without linking.

Starts a dgen_server process without linking, registered as Reg.

Starts a dgen_server process linked to the calling process.

Starts a dgen_server process linked to the calling process, registered as Reg.

Types

action()

-type action() :: fun().

event_type()

-type event_type() :: {call, from()} | cast | info.

from()

-type from() :: term().

init_ret()

-type init_ret() :: {ok, state()} | {ok, tuid(), state()} | {error, term()}.

internalstate()

-type internalstate() ::
          #state{tenant :: dgen_backend:tenant(),
                 mod :: atom(),
                 tuid :: tuple(),
                 watch :: undefined | dgen_backend:future(),
                 cache :: boolean(),
                 mod_state_cache ::
                     undefined |
                     {dgen_backend:versionstamp() | dgen_backend:future(),
                      {ok, term()} | {error, not_found}},
                 cache_misses :: non_neg_integer(),
                 dead_letter_threshold :: pos_integer() | infinity}.

lock_ret()

-type lock_ret() :: {lock, state()}.

noreply_ret()

-type noreply_ret() :: {noreply, state()} | {noreply, state(), [action()]}.

option()

-type option() ::
          {tenant, dgen_backend:tenant()} |
          {consume, boolean()} |
          {reset, boolean()} |
          {cache, boolean()} |
          {dead_letter_threshold, pos_integer() | infinity} |
          gen_server:start_opt().

options()

-type options() :: [option()].

reply_ret()

-type reply_ret() :: {reply, term(), state()} | {reply, term(), state(), [action()]}.

server()

-type server() :: gen_server:server_ref().

start_ret()

-type start_ret() :: gen_server:start_ret().

state()

-type state() :: term().

stop_ret()

-type stop_ret() :: {stop, term(), state()} | {stop, term(), state(), [action()]}.

tuid()

-type tuid() :: tuple().

Callbacks

handle_call(Request, From, State)

(optional)
-callback handle_call(Request :: term(), From :: from(), State :: state()) ->
                         reply_ret() | lock_ret() | stop_ret().

handle_cast(Msg, State)

(optional)
-callback handle_cast(Msg :: term(), State :: state()) -> noreply_ret() | lock_ret() | stop_ret().

handle_dead_letter(Msg, AttemptCount)

(optional)
-callback handle_dead_letter(Msg :: term(), AttemptCount :: non_neg_integer()) -> any().

handle_info(Info, State)

(optional)
-callback handle_info(Info :: term(), State :: state()) -> noreply_ret() | stop_ret().

handle_locked(EventType, Msg, State)

(optional)
-callback handle_locked(EventType :: event_type(), Msg :: term(), State :: state()) ->
                           reply_ret() | noreply_ret() | stop_ret().

init(Args)

-callback init(Args :: term()) -> init_ret().

Functions

call(Server, Request)

-spec call(server(), term()) -> term().

Sends a synchronous call request via the durable queue. Default timeout 5000ms.

call/3

-spec call(server(), term(), timeout() | list()) -> term().

Sends a synchronous call request via the durable queue.

The request is enqueued durably and the caller blocks until a consumer processes it and writes the reply, or until Timeout milliseconds elapse.

Options

  • timeout: Default 5000. Timeout in milliseconds, or infinity.

cast(Server, Request)

-spec cast(server(), term()) -> ok.

Sends an asynchronous cast request to the dgen_server's durable queue.

cast_k(Server, Requests)

-spec cast_k(server(), [term()]) -> ok.

Sends a batch of cast requests to the dgen_server's durable queue atomically.

code_change(OldVsn, State, Extra)

-spec code_change(term(), internalstate(), term()) -> {ok, internalstate()}.

get_quid(Tuple)

handle_call(Request, From, State)

-spec handle_call(term(), gen_server:from(), internalstate()) -> {reply, term(), internalstate()}.

handle_cast(Msg, State)

-spec handle_cast(term(), internalstate()) ->
                     {noreply, internalstate()} | {stop, term(), internalstate()}.

handle_info(Info, State)

-spec handle_info(term(), internalstate()) ->
                     {noreply, internalstate()} | {stop, term(), internalstate()}.

init(Args)

-spec init(term()) -> {ok, internalstate()} | {error, term()}.

kill(Server, Reason)

-spec kill(server(), term()) -> ok.

Kills the dgen_server, deleting all durable state, queue items, and waiting call keys. The process exits with Reason.

outbox_cast(Server)

-spec outbox_cast(server()) -> fun((dgen_backend:tx(), term()) -> ok).

Returns a closure for atomically casting a message from within the caller's own backend transaction.

Call this before opening the transaction as a preparatory step. Bind the result to Cast and call Cast(Tx, Message) inside the transaction to enqueue the message without going through the dgen_server process. The queue directory and identifier are captured internally and not exposed to the caller.

Backend coupling

This function is intended for callers that are already operating directly with a backend transaction — for example, when a message must be enqueued atomically alongside other writes in the same transaction. Using it means intentionally stepping outside the gen_server abstraction: the caller takes responsibility for managing the transaction lifetime and is coupled to the configured backend. If you do not need to compose the enqueue with other backend writes, prefer cast/2 instead.

outbox_cast(Server, Timeout)

-spec outbox_cast(server(), timeout()) -> fun((dgen_backend:tx(), term()) -> ok).

priority_call(Server, Request)

-spec priority_call(server(), term()) -> term().

Sends a call that bypasses the durable queue and is handled immediately.

Use with caution: this breaks ordering guarantees with respect to queued messages and ignores locks. Can be useful for snapshot reads.

priority_call(Server, Request, Timeout)

-spec priority_call(server(), term(), timeout()) -> term().

Like priority_call/2 but with an explicit timeout.

priority_cast(Server, Request)

-spec priority_cast(server(), term()) -> ok.

Sends a cast that bypasses the durable queue and is handled immediately.

Use with caution: this breaks ordering guarantees with respect to queued messages and ignores locks.

start(Mod, Arg, Opts)

-spec start(module(), term(), options()) -> start_ret().

Starts a dgen_server process without linking.

See start_link/3 for details on Mod, Arg, and Opts.

start(Reg, Mod, Arg, Opts)

-spec start(gen_server:server_name(), module(), term(), options()) -> start_ret().

Starts a dgen_server process without linking, registered as Reg.

See start_link/3 for details on Mod, Arg, and Opts.

start_link(Mod, Arg, Opts)

-spec start_link(module(), term(), options()) -> start_ret().

Starts a dgen_server process linked to the calling process.

  • Mod is the callback module implementing the dgen_server behaviour.
  • Arg is passed to Mod:init/1.
  • Opts is a proplist that must include {tenant, {Db, Dir}} and may include consume and reset.

start_link(Reg, Mod, Arg, Opts)

-spec start_link(gen_server:server_name(), module(), term(), options()) -> start_ret().

Starts a dgen_server process linked to the calling process, registered as Reg.

See start_link/3 for details on Mod, Arg, and Opts.

terminate(Reason, State)

-spec terminate(term(), internalstate()) -> ok.