macula_gateway_pubsub (macula v0.20.5)

View Source

Pub/Sub Handler GenServer - manages topic subscriptions and message routing.

Responsibilities: - Subscribe/unsubscribe streams to topics - Route published messages to matching subscribers - Support wildcard topics (* single-level, ** multi-level) - Track bidirectional mapping (topic ↔ stream) - Monitor stream processes for automatic cleanup

Extracted from macula_gateway.erl (Phase 3)

Summary

Functions

Deliver a message to LOCAL subscribers only (no remote routing). Used by pubsub_route delivery to prevent message amplification. When a message arrives from another node via pubsub_route, it should only be delivered to local subscribers, NOT re-routed to remote subscribers.

Get all topics a stream is subscribed to.

Get all subscribers for a topic (exact and wildcard matches).

Handle stream process death - automatic cleanup.

Publish a message to a topic (routes to matching subscribers).

Start the pub/sub handler with options.

Stop the pub/sub handler.

Subscribe a stream to a topic (supports wildcards). Async (cast) to prevent blocking callers when PubSub is busy.

Unsubscribe a stream from a topic. Async (cast) to prevent blocking callers when PubSub is busy.

Types

stream_handle/0

-type stream_handle() :: pid() | reference().

Functions

deliver_local(Pid, Topic, Payload)

-spec deliver_local(pid(), binary(), map()) -> ok.

Deliver a message to LOCAL subscribers only (no remote routing). Used by pubsub_route delivery to prevent message amplification. When a message arrives from another node via pubsub_route, it should only be delivered to local subscribers, NOT re-routed to remote subscribers.

get_stream_topics(Pid, Stream)

-spec get_stream_topics(pid(), pid()) -> {ok, [binary()]}.

Get all topics a stream is subscribed to.

get_subscribers(Pid, Topic)

-spec get_subscribers(pid(), binary()) -> {ok, [pid()]}.

Get all subscribers for a topic (exact and wildcard matches).

handle_call(Request, From, State)

handle_cast(Msg, State)

handle_info(Info, State)

Handle stream process death - automatic cleanup.

init(Opts)

publish(Pid, Topic, Payload)

-spec publish(pid(), binary(), map()) -> ok.

Publish a message to a topic (routes to matching subscribers).

start_link(Opts)

-spec start_link(map()) -> {ok, pid()} | {error, term()}.

Start the pub/sub handler with options.

stop(Pid)

-spec stop(pid()) -> ok.

Stop the pub/sub handler.

subscribe(Pid, Stream, Topic)

-spec subscribe(pid(), pid() | reference(), binary()) -> ok.

Subscribe a stream to a topic (supports wildcards). Async (cast) to prevent blocking callers when PubSub is busy.

terminate(Reason, State)

unsubscribe(Pid, Stream, Topic)

-spec unsubscribe(pid(), pid() | reference(), binary()) -> ok.

Unsubscribe a stream from a topic. Async (cast) to prevent blocking callers when PubSub is busy.