macula_subscriber_cache (macula v0.20.5)

View Source

DHT Subscriber Cache - Caches topic→subscribers mappings for fast pub/sub.

Problem: DHT lookups on every PUBLISH cause 50-200ms latency per message. Solution: Cache topic→subscribers mappings with TTL-based expiration.

Design: - ETS table for O(1) lookup - TTL-based expiration (default 5 seconds) - Automatic cache invalidation on subscribe/unsubscribe - Thread-safe concurrent reads - Adaptive rate-limiting to prevent discovery storms (default 2 seconds)

Rate-Limiting: When cache expires and many publishes occur, rate-limiting prevents flooding the DHT with lookup queries. Only one DHT query per topic is allowed within the min_discovery_interval_ms window.

Expected improvement: - 5-10x latency reduction for high-frequency topics (caching) - 2-3x improvement during high-frequency publishing (rate-limiting)

Configuration Options: - ttl_ms: Cache entry TTL (default: 5000ms) - min_discovery_interval_ms: Minimum time between DHT queries per topic (default: 2000ms)

Summary

Functions

Invalidate cache entry for a topic. Should be called when subscription changes occur.

Invalidate all cache entries. Useful for testing or when major topology changes occur.

Look up subscribers for a topic from cache. Returns {ok, Subscribers} on cache hit, or {miss, TopicKey} on cache miss. TopicKey is the SHA256 hash of the topic binary.

Record that we just performed a DHT query for a topic. Call this after a successful DHT lookup to reset the rate-limit timer.

Check if we should query DHT for a topic (rate-limiting check). Returns true if enough time has passed since last DHT query. Returns false if we recently queried DHT (rate-limited).

Start the subscriber cache with default options.

Start the subscriber cache with options. Options: - ttl_ms: Cache entry TTL in milliseconds (default: 5000)

Get cache statistics.

Store subscribers for a topic in cache. Subscribers will expire after TTL.

Functions

handle_call(Request, From, State)

handle_cast(Msg, State)

handle_info(Info, State)

init(Opts)

invalidate(Topic)

-spec invalidate(binary()) -> ok.

Invalidate cache entry for a topic. Should be called when subscription changes occur.

invalidate_all()

-spec invalidate_all() -> ok.

Invalidate all cache entries. Useful for testing or when major topology changes occur.

lookup(Topic)

-spec lookup(binary()) -> {ok, list()} | {miss, binary()}.

Look up subscribers for a topic from cache. Returns {ok, Subscribers} on cache hit, or {miss, TopicKey} on cache miss. TopicKey is the SHA256 hash of the topic binary.

record_dht_query(Topic)

-spec record_dht_query(binary()) -> ok.

Record that we just performed a DHT query for a topic. Call this after a successful DHT lookup to reset the rate-limit timer.

should_query_dht(Topic)

-spec should_query_dht(binary()) -> boolean().

Check if we should query DHT for a topic (rate-limiting check). Returns true if enough time has passed since last DHT query. Returns false if we recently queried DHT (rate-limited).

start_link()

-spec start_link() -> {ok, pid()} | {error, term()}.

Start the subscriber cache with default options.

start_link(Opts)

-spec start_link(map()) -> {ok, pid()} | {error, term()}.

Start the subscriber cache with options. Options: - ttl_ms: Cache entry TTL in milliseconds (default: 5000)

stats()

-spec stats() -> map().

Get cache statistics.

store(Topic, Subscribers)

-spec store(binary(), list()) -> ok.

Store subscribers for a topic in cache. Subscribers will expire after TTL.

terminate(Reason, State)