erllama_cache_writer (erllama v0.1.0)

View Source

File-tier save orchestrator with a leak-proof ETS counting semaphore.

Modelled on py_semaphore from erlang-python (Discord pattern): one ETS table holding {running, N} and {max, M} counters; atomic update_counter for fast inspection from any process.

Unlike the bare ETS pattern, this module also runs a small gen_server that owns the holders map and monitors every active acquirer. If a holder dies between acquire and release (SIGKILL, process crash, etc.), the gen_server's 'DOWN' handler releases the slot. No leaks.

Hot path:

current/0 and max_concurrent/0 read ETS directly (no server hop). acquire/1 and release/0 go through this gen_server, which serialises monitor bookkeeping. Per-call overhead is roughly one gen_server hop (~1 us); for save paths measured in seconds this is invisible.

Save pipeline run by save/4 in the caller's process:

acquire (with backoff up to AcquireTimeoutMs) meta_srv:reserve_save -> Token disk_srv:save (returns once the file is linked + validated) meta_srv:announce_saved -> available release (try/after)

A try/after releases on normal exits and exceptions; the gen_server's monitor catches everything else.

Summary

Functions

Try to acquire a slot. Blocks with exponential backoff up to Timeout milliseconds. On success the holder ref is stashed in the process dictionary so release/0 finds it.

Types

state()

-type state() :: #state{holders :: #{reference() => pid()}}.

Functions

acquire/1

-spec acquire(timeout()) -> ok | {error, max_concurrent}.

Try to acquire a slot. Blocks with exponential backoff up to Timeout milliseconds. On success the holder ref is stashed in the process dictionary so release/0 finds it.

current()

-spec current() -> non_neg_integer().

handle_call/3

handle_cast(Msg, S)

-spec handle_cast(term(), state()) -> {noreply, state()}.

handle_info/2

init/1

-spec init([pos_integer()]) -> {ok, state()}.

max_concurrent()

-spec max_concurrent() -> pos_integer().

release()

-spec release() -> ok.

save(TierSrv, Tier, BuildMeta, Payload)

-spec save(atom(), disk | ram_file, erllama_cache_kvc:build_meta(), binary()) ->
              {ok, erllama_cache:cache_key()} | {error, term()}.

save(TierSrv, Tier, BuildMeta, Payload, AcquireTimeout)

-spec save(atom(), disk | ram_file, erllama_cache_kvc:build_meta(), binary(), timeout()) ->
              {ok, erllama_cache:cache_key()} | {error, term()}.

set_max_concurrent(Max)

-spec set_max_concurrent(pos_integer()) -> ok.

start_link()

-spec start_link() -> {ok, pid()} | {error, term()}.

start_link(Max)

-spec start_link(pos_integer()) -> {ok, pid()} | {error, term()}.