macula_rpc_async (macula v0.20.5)

View Source

Async RPC Module (NATS-style Request/Reply)

Handles asynchronous RPC operations with callback-based responses: - Callback management (fun callbacks and pid callbacks) - Request ID generation and tracking - Request message building for P2P delivery - Reply processing and callback invocation - Timeout handling for async requests

This module provides stateless helper functions used by macula_rpc_handler. The actual state (pending_requests map) remains in the handler.

Extracted from macula_rpc_handler.erl (Dec 2025) to improve testability and separation of concerns.

Summary

Functions

Build an RPC_REQUEST message for NATS-style async RPC. Includes from_endpoint so receiver can route reply back directly.

Calculate RTT from sent_at timestamp.

Extract result from RPC reply message. Returns {ok, DecodedValue} or {error, ErrorReason}.

Extract callback from options, defaulting to pid callback. If opts contains a callback function, use it. Otherwise send to caller pid.

Get local endpoint from environment variables. Used to include sender's endpoint in RPC requests so receivers can route replies back. Format: "hostname:port" (e.g., "fc01:4433" in Docker)

Invoke async callback with result. For function callbacks, spawns a process to avoid blocking. For pid callbacks, sends a message.

Types

callback/0

-type callback() :: {fun_cb, fun((term()) -> any())} | {pid_cb, pid()}.

Functions

build_request_message(RequestId, Procedure, EncodedArgs, FromNodeId, Realm)

-spec build_request_message(binary(), binary(), binary(), binary(), binary()) -> map().

Build an RPC_REQUEST message for NATS-style async RPC. Includes from_endpoint so receiver can route reply back directly.

calculate_rtt(SentAt)

-spec calculate_rtt(integer()) -> non_neg_integer().

Calculate RTT from sent_at timestamp.

extract_result(Msg)

-spec extract_result(map()) -> {ok, term()} | {error, term()}.

Extract result from RPC reply message. Returns {ok, DecodedValue} or {error, ErrorReason}.

get_callback(Opts, CallerPid)

-spec get_callback(map(), pid()) -> callback().

Extract callback from options, defaulting to pid callback. If opts contains a callback function, use it. Otherwise send to caller pid.

get_local_endpoint()

-spec get_local_endpoint() -> binary().

Get local endpoint from environment variables. Used to include sender's endpoint in RPC requests so receivers can route replies back. Format: "hostname:port" (e.g., "fc01:4433" in Docker)

invoke_callback(_, RequestId, Result)

-spec invoke_callback(callback(), binary(), term()) -> ok.

Invoke async callback with result. For function callbacks, spawns a process to avoid blocking. For pid callbacks, sends a message.