macula_pubsub_dht (macula v0.20.5)

View Source

DHT operations for pub/sub - handles subscription advertisement and discovery.

Responsibilities: - Advertise subscriptions in DHT with TTL - Schedule re-advertisement timers - Discover remote subscribers via DHT queries - Route messages to remote subscribers - Track pending DHT queries

Extracted from macula_pubsub_handler.erl (Phase 3)

Summary

Functions

Advertise a subscription in the DHT. Sends STORE message to DHT and schedules re-advertisement. Returns {ok, SubInfo}.

Cancel advertisement for a topic. Cancels the re-advertisement timer. Returns updated advertised_subscriptions map.

Discover remote subscribers for a topic. Checks cache first, queries DHT on cache miss. Returns {cached, Subscribers, Registry} | {query_sent, Pending, MsgId, Registry}.

Handle DHT discovery response. Routes messages to discovered subscribers. Returns updated pending queries map.

Route message to remote subscribers via direct P2P connections (v0.8.0+). Wraps publish in pubsub_route envelope and sends directly to each subscriber. Uses macula_peer_connector for direct QUIC connections to subscriber endpoints.

Types

advertised_subscriptions/0

-type advertised_subscriptions() ::
          #{topic() => #{sub_ref := reference(), ttl := pos_integer(), timer_ref := reference()}}.

connection_manager_pid/0

-type connection_manager_pid() :: pid().

node_id/0

-type node_id() :: binary().

payload/0

-type payload() :: binary().

pending_queries/0

-type pending_queries() :: #{binary() => {topic(), payload(), qos(), map()}}.

qos/0

-type qos() :: 0 | 1.

subscription_ref/0

-type subscription_ref() :: reference().

topic/0

-type topic() :: binary().

url/0

-type url() :: binary().

Functions

cancel_advertisement(Topic, AdvertisedSubscriptions)

-spec cancel_advertisement(topic(), advertised_subscriptions()) -> advertised_subscriptions().

Cancel advertisement for a topic. Cancels the re-advertisement timer. Returns updated advertised_subscriptions map.

discover_subscribers(Topic, Payload, Qos, ConnMgrPid, ServiceRegistry, MsgIdCounter)

-spec discover_subscribers(topic(),
                           payload(),
                           qos(),
                           connection_manager_pid(),
                           term(),
                           non_neg_integer()) ->
                              {cached, list(), term()} |
                              {query_sent, pending_queries(), binary(), term()}.

Discover remote subscribers for a topic. Checks cache first, queries DHT on cache miss. Returns {cached, Subscribers, Registry} | {query_sent, Pending, MsgId, Registry}.

handle_discovery_response(MsgId, Subscribers, PendingQueries)

-spec handle_discovery_response(binary(), list(), pending_queries()) ->
                                   {ok, pending_queries()} | {not_found, pending_queries()}.

Handle DHT discovery response. Routes messages to discovered subscribers. Returns updated pending queries map.

route_to_subscribers(Topic, Payload, Qos, Subscribers, NodeId)

-spec route_to_subscribers(topic(), payload(), qos(), list(), node_id()) -> ok.

Route message to remote subscribers via direct P2P connections (v0.8.0+). Wraps publish in pubsub_route envelope and sends directly to each subscriber. Uses macula_peer_connector for direct QUIC connections to subscriber endpoints.