macula_gateway_clients (macula v0.20.5)

View Source

Clients Worker GenServer - tracks connected clients.

Responsibilities: - Track connected clients with metadata (BOUNDED POOL) - Enforce max_clients limit with backpressure - Monitor client processes for automatic cleanup - Store bidirectional streams for client communication - Provide client query APIs

Pattern: Bounded client pool with backpressure - Tracks clients with max_clients limit (default: 10,000) - Rejects new clients when pool is full (backpressure) - Allows updates to existing clients even when pool is full

Configuration: - max_clients: Maximum concurrent clients (default: 10,000)

Extracted from macula_gateway.erl (Phase 2) Renamed from macula_gateway_client_manager (Phase 2 QUIC refactoring)

Summary

Functions

Broadcast a message to all connected clients.

Register a connected client with metadata. Monitors the client process for automatic cleanup on death.

Unregister a disconnected client.

Get all connected clients.

Get all node IDs with stored client streams (for debugging).

Get information about a specific client.

Get the stored stream for a client node.

Get the stream PID for a given endpoint URL. Used for routing pub/sub messages to remote subscribers.

Handle client process death - automatic cleanup.

Check if a client is alive (process still running).

Remove a stale stream for a node when send fails with 'closed'. This is called by the gateway when quicer:send returns {error, closed} to clean up the invalid stream reference from our maps. Uses async cast to avoid blocking the gateway during send operations.

Start the client manager with options. Registers as 'macula_gateway_clients' for discovery by pubsub module.

Stop the client manager.

Store a bidirectional stream for a client node (legacy 3-arg version).

Store a bidirectional stream for a client node with endpoint tracking.

Types

client_info/0

-type client_info() ::
          #{realm := binary(), node_id := binary(), endpoint => binary(), capabilities => [atom()]}.

Functions

broadcast(Pid, EncodedMsg)

-spec broadcast(pid(), binary()) -> ok.

Broadcast a message to all connected clients.

client_connected(Pid, ClientPid, ClientInfo)

-spec client_connected(pid(), pid(), client_info()) -> ok.

Register a connected client with metadata. Monitors the client process for automatic cleanup on death.

client_disconnected(Pid, ClientPid)

-spec client_disconnected(pid(), pid()) -> ok.

Unregister a disconnected client.

get_all_clients(Pid)

-spec get_all_clients(pid()) -> {ok, [{pid(), client_info()}]}.

Get all connected clients.

get_all_node_ids(Pid)

-spec get_all_node_ids(pid()) -> [binary()].

Get all node IDs with stored client streams (for debugging).

get_client_info(Pid, ClientPid)

-spec get_client_info(pid(), pid()) -> {ok, client_info()} | not_found.

Get information about a specific client.

get_client_stream(Pid, NodeId)

-spec get_client_stream(pid(), binary()) -> {ok, pid()} | not_found.

Get the stored stream for a client node.

get_stream_by_endpoint(Pid, Endpoint)

-spec get_stream_by_endpoint(pid(), binary()) -> {ok, pid()} | {error, not_found}.

Get the stream PID for a given endpoint URL. Used for routing pub/sub messages to remote subscribers.

handle_call(Request, From, State)

handle_cast(Msg, State)

handle_info(Info, State)

Handle client process death - automatic cleanup.

init(Opts)

is_client_alive(Pid, ClientPid)

-spec is_client_alive(pid(), pid()) -> boolean().

Check if a client is alive (process still running).

remove_stale_stream(Pid, NodeId)

-spec remove_stale_stream(pid(), binary()) -> ok.

Remove a stale stream for a node when send fails with 'closed'. This is called by the gateway when quicer:send returns {error, closed} to clean up the invalid stream reference from our maps. Uses async cast to avoid blocking the gateway during send operations.

start_link(Opts)

-spec start_link(map()) -> {ok, pid()} | {error, term()}.

Start the client manager with options. Registers as 'macula_gateway_clients' for discovery by pubsub module.

stop(Pid)

-spec stop(pid()) -> ok.

Stop the client manager.

store_client_stream(Pid, NodeId, StreamPid)

-spec store_client_stream(pid(), binary(), pid()) -> ok.

Store a bidirectional stream for a client node (legacy 3-arg version).

store_client_stream(Pid, NodeId, StreamPid, Endpoint)

-spec store_client_stream(pid(), binary(), pid(), binary()) -> ok.

Store a bidirectional stream for a client node with endpoint tracking.

terminate(Reason, State)