esdb_gater_worker_registry (reckon_gater v1.3.0)

View Source

pg-based distributed worker registry for reckon-gater

Provides cluster-wide worker registration and discovery using Erlang's built-in pg (process groups) module.

Cluster-Wide Discovery

Workers registered on any node are discoverable from all nodes:

- pg:join/3` broadcasts group membership to all connected nodes - `pg:get_members/2` returns PIDs from ALL nodes in the cluster - When a node fails, pg automatically removes its members from groups == Design Rationale == This implementation uses pg instead of Ra because: 1. **Simplicity** - pg is built into OTP, no external dependencies 2. **Sufficient consistency** - Gateway worker discovery doesnt require strong consistency; finding "any available worker" is fine 3. **No conflicts** - Avoids Ra cluster ID conflicts with Khepri stores 4. **Automatic cleanup** - pg handles node failures automatically

Eventual Consistency

pg provides eventual consistency. During network partitions, different nodes may briefly see different worker lists. This is acceptable for gateway workers since:

- Workers are stateless proxies to the event store - Requests are retried with exponential backoff - Any available worker can handle any request

Key Features

- Register gateway workers for specific stores - Cluster-wide worker discovery via pg groups - Automatic cleanup on worker death (local monitor) - Automatic cleanup on node failure (pg membership) - Load balancing via round-robin worker selection

Summary

Types

worker_entry/0

-type worker_entry() ::
          #worker_entry{store_id :: atom(), node :: node(), pid :: pid(), registered_at :: integer()}.

Functions

get_all_workers()

-spec get_all_workers() -> {ok, #{atom() => [worker_entry()]}} | {error, term()}.

Get all registered workers

get_workers(StoreId)

-spec get_workers(atom()) -> {ok, [worker_entry()]} | {error, term()}.

Get all workers for a store

handle_call(Request, From, State)

handle_cast(Msg, State)

handle_info(Info, State)

init(_)

register_worker(StoreId, Pid)

-spec register_worker(atom(), pid()) -> ok | {error, term()}.

Register a worker for a store

start_link()

-spec start_link() -> {ok, pid()} | {error, term()}.

terminate(Reason, State)

unregister_worker(StoreId, Pid)

-spec unregister_worker(atom(), pid()) -> ok | {error, term()}.

Unregister a worker for a store