macula_rpc_handler (macula v0.20.3)

View Source

RPC handler GenServer - manages RPC calls, replies, and failover.

Responsibilities: - Execute RPC calls (local check first, then DHT discovery) - Handle incoming RPC replies from network - Manage call timeouts with automatic failover - Track pending calls with call IDs - Monitor caller processes for automatic cleanup - Provider selection strategies (random, round-robin, etc.)

Memory Safety: - Monitors caller processes to prevent memory leaks - Cleans up immediately when caller dies (no waiting for timeout) - Cancels timers and removes pending entries on cleanup

Extracted from macula_connection.erl (Phase 5)

Summary

Functions

Make an RPC call with default options.

Make an RPC call with options.

Make a synchronous RPC call to a specific target node.

Get the list of configured service interests.

Get service registry (for local handler lookup by gateway).

Handle incoming async RPC reply (called by message router).

Handle FIND_VALUE_REPLY from DHT query

Dynamically prefetch services for pull-based discovery. This allows adding service interests at runtime (after init). Services are looked up via DHT and cached for faster first requests.

Register a local procedure handler (no DHT advertisement).

Send async RPC request with callback (NATS-style).

Send async RPC request to specific node (skip DHT lookup).

Functions

call(Pid, Procedure, Args)

-spec call(pid(), binary() | list() | atom(), term()) -> {ok, term()} | {error, term()}.

Make an RPC call with default options.

call(Pid, Procedure, Args, Opts)

-spec call(pid(), binary() | list() | atom(), term(), map()) -> {ok, term()} | {error, term()}.

Make an RPC call with options.

call_to(Pid, TargetNodeId, Procedure, Args, Opts)

-spec call_to(pid(), binary(), binary() | list() | atom(), term(), map()) ->
                 {ok, term()} | {error, term()}.

Make a synchronous RPC call to a specific target node.

Unlike call/4 which discovers providers via DHT, this function sends the RPC directly to the specified target node. The message is still routed via DHT infrastructure (for NAT traversal and relay), but targets a specific node rather than discovering one.

Use this when you already know the target node's ID (e.g., from a previous DHT discovery, artifact publisher node, or direct advertisement).

get_service_interests(Pid)

-spec get_service_interests(pid()) -> [binary()].

Get the list of configured service interests.

get_service_registry(Pid)

-spec get_service_registry(pid()) -> macula_service_registry:registry().

Get service registry (for local handler lookup by gateway).

handle_async_reply(Pid, Msg)

-spec handle_async_reply(pid(), map()) -> ok.

Handle incoming async RPC reply (called by message router).

handle_call(Request, From, State)

handle_cast(Msg, State)

handle_find_value_reply(Pid, Msg)

-spec handle_find_value_reply(pid(), map()) -> ok.

Handle FIND_VALUE_REPLY from DHT query

handle_incoming_reply(Pid, Msg)

-spec handle_incoming_reply(pid(), map()) -> ok.

handle_info(Info, State)

init(Opts)

prefetch_services(Pid, Services)

-spec prefetch_services(pid(), [binary() | atom() | string()]) -> ok.

Dynamically prefetch services for pull-based discovery. This allows adding service interests at runtime (after init). Services are looked up via DHT and cached for faster first requests.

register_handler(Service, Handler)

-spec register_handler(binary() | list() | atom(), fun((term()) -> {ok, term()} | {error, term()})) ->
                          {ok, reference()} | {error, term()}.

register_local_procedure(Pid, Procedure, Handler)

-spec register_local_procedure(pid(), binary(), fun((term()) -> {ok, term()} | {error, term()})) -> ok.

Register a local procedure handler (no DHT advertisement).

This registers the handler function in the service registry so this RPC handler can execute it locally when called. Unlike register_handler/2, this does NOT advertise to the DHT - it's for purely local services.

request(Pid, Procedure, Args, Opts)

-spec request(pid(), binary(), term(), map()) -> {ok, binary()} | {error, term()}.

Send async RPC request with callback (NATS-style).

This is the primary API for async RPC. The caller does not block. Response is delivered via callback function or process message.

Opts: - callback => fun((Result) -> any()) - Called with {ok, Result} | {error, Reason} - timeout => integer() - Milliseconds before timeout (default: 5000)

Returns {ok, RequestId} immediately, or {error, Reason} on send failure.

request_to(Pid, TargetNodeId, Procedure, Args, Opts)

-spec request_to(pid(), binary(), binary(), term(), map()) -> {ok, binary()} | {error, term()}.

Send async RPC request to specific node (skip DHT lookup).

Use this when you already know the target node's ID (e.g., from a previous DHT discovery or direct node advertisement).

start_link(Opts)

-spec start_link(map()) -> {ok, pid()} | {error, term()}.

terminate(Reason, State)

unregister_handler(Service)

-spec unregister_handler(binary() | list() | atom()) -> ok | {error, term()}.