macula_pubsub_dht (macula v0.14.3)
View SourceDHT operations for pub/sub - handles subscription advertisement and discovery.
Responsibilities: - Advertise subscriptions in DHT with TTL - Schedule re-advertisement timers - Discover remote subscribers via DHT queries - Route messages to remote subscribers - Track pending DHT queries
Extracted from macula_pubsub_handler.erl (Phase 3)
Summary
Functions
Advertise a subscription in the DHT. Sends STORE message to DHT and schedules re-advertisement. Returns {ok, SubInfo}.
Cancel advertisement for a topic. Cancels the re-advertisement timer. Returns updated advertised_subscriptions map.
Discover remote subscribers for a topic. Checks cache first, queries DHT on cache miss. Returns {cached, Subscribers, Registry} | {query_sent, Pending, MsgId, Registry}.
Handle DHT discovery response. Routes messages to discovered subscribers. Returns updated pending queries map.
Route message to remote subscribers via direct P2P connections (v0.8.0+). Wraps publish in pubsub_route envelope and sends directly to each subscriber. Uses macula_peer_connector for direct QUIC connections to subscriber endpoints.
Types
-type advertised_subscriptions() :: #{topic() => #{sub_ref := reference(), ttl := pos_integer(), timer_ref := reference()}}.
-type connection_manager_pid() :: pid().
-type node_id() :: binary().
-type payload() :: binary().
-type qos() :: 0 | 1.
-type subscription_ref() :: reference().
-type topic() :: binary().
-type url() :: binary().
Functions
-spec advertise_subscription(topic(), subscription_ref(), node_id(), url(), connection_manager_pid()) -> {ok, #{sub_ref := reference(), ttl := pos_integer(), timer_ref := reference()}} | {error, term()}.
Advertise a subscription in the DHT. Sends STORE message to DHT and schedules re-advertisement. Returns {ok, SubInfo}.
-spec cancel_advertisement(topic(), advertised_subscriptions()) -> advertised_subscriptions().
Cancel advertisement for a topic. Cancels the re-advertisement timer. Returns updated advertised_subscriptions map.
-spec discover_subscribers(topic(), payload(), qos(), connection_manager_pid(), term(), non_neg_integer()) -> {cached, list(), term()} | {query_sent, pending_queries(), binary(), term()}.
Discover remote subscribers for a topic. Checks cache first, queries DHT on cache miss. Returns {cached, Subscribers, Registry} | {query_sent, Pending, MsgId, Registry}.
-spec handle_discovery_response(binary(), list(), pending_queries()) -> {ok, pending_queries()} | {not_found, pending_queries()}.
Handle DHT discovery response. Routes messages to discovered subscribers. Returns updated pending queries map.
Route message to remote subscribers via direct P2P connections (v0.8.0+). Wraps publish in pubsub_route envelope and sends directly to each subscriber. Uses macula_peer_connector for direct QUIC connections to subscriber endpoints.