macula_service_registry (macula v0.20.3)

View Source

Decentralized service advertisement registry using DHT.

Provides service discovery via Kademlia DHT instead of centralized registration. Services advertise their capabilities to the DHT, and clients discover providers by querying the DHT.

Architecture

- Services advertise: "I provide procedure X" → DHT stores node_id at key=hash(procedure) - Clients discover: "Who provides procedure X?" → DHT returns list of node_ids - Local cache: Recent discoveries cached with TTL for low-latency lookups - Re-advertisement: Periodic republish to DHT for TTL renewal (default: every 5 min)

Features

- Fully decentralized (no central authority) - Multiple providers supported (DHT returns list) - Load balancing (client picks from list) - Fault tolerant (try another provider if one fails) - Low latency after first lookup (local cache)

Summary

Types

Handler function for local service implementations.

32-byte node identifier.

Service identifier (procedure URI). Example: <<"energy.home.get">>.

Functions

Advertise a service locally (stores handler for incoming calls).

Cache discovered service providers.

Cache discovered subscribers for a topic.

Clear the entire discovery cache.

Clear the entire subscriber cache.

Discover service providers (checks cache first, returns cached if available).

Discover service providers with options.

Discover subscribers for a topic (checks cache first).

Get handler function for a locally advertised service.

List all locally advertised services.

Create new empty service registry with default settings.

Create new service registry with custom options.

Remove expired entries from discovery cache.

Remove expired local services.

Remove expired subscriber cache entries.

Publish a service advertisement to the DHT.

Query the DHT for service providers.

Remove a service advertisement from the DHT.

Remove a local service advertisement.

Types

cache_entry/0

-type cache_entry() ::
          #{service_id := service_id(),
            providers := [provider_info()],
            cached_at := integer(),
            ttl := pos_integer()}.

handler_fn/0

-type handler_fn() :: fun((map()) -> {ok, term()} | {error, term()}).

Handler function for local service implementations.

local_service/0

-type local_service() ::
          #{service_id := service_id(),
            handler := handler_fn(),
            metadata := map(),
            advertised_at := integer()}.

node_id/0

-type node_id() :: binary().

32-byte node identifier.

provider_info/0

-type provider_info() ::
          #{node_id := node_id(), endpoint := binary(), metadata := map(), advertised_at := integer()}.

registry/0

-type registry() ::
          #{local_services := #{service_id() => local_service()},
            cache := #{service_id() => cache_entry()},
            subscriber_cache := #{binary() => cache_entry()},
            default_ttl := pos_integer(),
            cache_ttl := pos_integer(),
            service_ttl := pos_integer()}.

service_id/0

-type service_id() :: binary().

Service identifier (procedure URI). Example: <<"energy.home.get">>.

Functions

cache_service(Registry, ServiceId, Providers, TTL)

-spec cache_service(registry(), service_id(), [provider_info()], pos_integer()) -> registry().

Cache discovered service providers.

Stores providers in local cache with TTL. Subsequent discover_service calls will return cached results until TTL expires.

cache_subscribers(Registry, Topic, Subscribers, TTL)

-spec cache_subscribers(registry(), binary(), [provider_info()], pos_integer()) -> registry().

Cache discovered subscribers for a topic.

Stores subscribers in local cache with TTL. Subsequent discover_subscribers/2 calls will return cached results until TTL expires.

clear_cache(Registry)

-spec clear_cache(registry()) -> registry().

Clear the entire discovery cache.

clear_subscriber_cache(Registry)

-spec clear_subscriber_cache(registry()) -> registry().

Clear the entire subscriber cache.

discover_service(Registry, ServiceId)

-spec discover_service(registry(), service_id()) ->
                          {ok, [provider_info()], registry()} | {cache_miss, registry()}.

Discover service providers (checks cache first, returns cached if available).

discover_service(Registry, ServiceId, Opts)

-spec discover_service(registry(), service_id(), map()) ->
                          {ok, [provider_info()], registry()} | {cache_miss, registry()}.

Discover service providers with options.

Checks local cache first. If found and not expired, returns cached providers. If cache miss or expired, returns {cache_miss, Registry} so caller can query DHT.

Options: - force_refresh - Skip cache, force DHT lookup (default: false)

discover_subscribers(Registry, Topic)

-spec discover_subscribers(registry(), binary()) ->
                              {ok, [provider_info()], registry()} | {cache_miss, registry()}.

Discover subscribers for a topic (checks cache first).

Similar to discover_service/2 but for pub/sub subscribers. Returns cached subscribers if found and not expired, otherwise cache_miss.

get_local_handler(_, ServiceId)

-spec get_local_handler(registry(), service_id()) -> {ok, handler_fn()} | not_found.

Get handler function for a locally advertised service.

list_local_services(_)

-spec list_local_services(registry()) -> [service_id()].

List all locally advertised services.

new()

-spec new() -> registry().

Create new empty service registry with default settings.

new(Opts)

-spec new(map()) -> registry().

Create new service registry with custom options.

Options: - default_ttl - Default TTL for DHT advertisements (default: 300s) - cache_ttl - How long to cache discovered services (default: 60s) - service_ttl - TTL for local services before cleanup (default: 300s, 5 minutes)

prune_expired(Registry)

-spec prune_expired(registry()) -> {registry(), non_neg_integer()}.

Remove expired entries from discovery cache.

Should be called periodically to prevent memory leaks. Returns updated registry and count of removed entries.

prune_expired_local_services(Registry)

-spec prune_expired_local_services(registry()) -> {registry(), non_neg_integer()}.

Remove expired local services.

Should be called periodically to prevent memory leaks from stale service registrations. Returns updated registry and count of removed services.

prune_expired_subscribers(Registry)

-spec prune_expired_subscribers(registry()) -> {registry(), non_neg_integer()}.

Remove expired subscriber cache entries.

Should be called periodically to prevent memory leaks. Returns updated registry and count of removed entries.

publish_to_dht(DhtPid, ServiceId, ProviderInfo, TTL, K)

-spec publish_to_dht(pid() | atom(), service_id(), provider_info(), pos_integer(), pos_integer()) ->
                        ok | {error, term()}.

Publish a service advertisement to the DHT.

This function publishes a service's provider information to the DHT so other nodes can discover it. The service_id is hashed to create a DHT key, and the provider information is stored at that key.

Parameters: - DhtPid: Process ID or registered name of macula_routing_server - ServiceId: The service identifier (procedure URI) - ProviderInfo: Information about this provider (node_id, endpoint, metadata) - TTL: Time-to-live in seconds for this advertisement - K: Number of nodes to store at (typically 20 for Kademlia)

Returns: - ok if successful - {error, Reason} if publication failed

Example:

  ProviderInfo = #{
      node_id => <<"my-node-123">>,
      endpoint => <<"https://localhost:9443">>,
      metadata => #{version => <<"1.0">>}
  },
  ok = publish_to_dht(DhtPid, &lt;&lt;"energy.home.get"&gt;&gt;, ProviderInfo, 300, 20).

query_dht_for_service(DhtPid, ServiceId, K)

-spec query_dht_for_service(pid() | atom(), service_id(), pos_integer()) ->
                               {ok, [provider_info()]} | {error, term()}.

Query the DHT for service providers.

This function queries the DHT to find nodes that provide a given service. It returns a list of provider_info() maps, each containing node_id, endpoint, and metadata for a provider.

Parameters: - DhtPid: Process ID or registered name of macula_routing_server - ServiceId: The service identifier to query for - K: Number of closest nodes to query (typically 20 for Kademlia)

Returns: - {ok, [ProviderInfo]} if providers found - {ok, []} if no providers found - {error, Reason} if query failed

Example:

  {ok, Providers} = query_dht_for_service(DhtPid, &lt;&lt;"energy.home.get"&gt;&gt;, 20),
  %% Returns: [{ok, [#{node_id => ..., endpoint => ..., metadata => ...}]}]

remove_from_dht(DhtPid, ServiceId, NodeId)

-spec remove_from_dht(pid() | atom(), service_id(), node_id()) -> ok | {error, term()}.

Remove a service advertisement from the DHT.

This function removes a service advertisement when unadvertising. Note: In practice, DHT entries expire naturally via TTL, so this is optional and mainly useful for immediate cleanup.

Parameters: - DhtPid: Process ID or registered name of macula_routing_server - ServiceId: The service identifier to remove - NodeId: This node's identifier (to remove only this provider)

Returns: - ok if successful or entry not found - {error, Reason} if removal failed

unadvertise_local(Registry, ServiceId)

-spec unadvertise_local(registry(), service_id()) -> registry().

Remove a local service advertisement.