macula_pubsub_qos (macula v0.20.5)

View Source

QoS (Quality of Service) manager for pub/sub.

Handles QoS 1 (at-least-once delivery) logic: - Message tracking with timeout timers - Automatic retry on timeout (up to max retries) - Acknowledgment handling

Extracted from macula_pubsub_handler.erl (Phase 2)

Summary

Functions

Get list of pending message IDs (for testing/debugging).

Handle acknowledgment for a message. Cancels timer and removes message from pending map. Returns updated pending_pubacks map.

Handle timeout for a pending message. Retries sending if under max retries, otherwise gives up. Returns {retry, UpdatedPending, PublishMsg} | {give_up, UpdatedPending}.

Track a message for QoS 1 acknowledgment. Starts a timeout timer and stores message in pending map. Returns updated pending_pubacks map.

Types

connection_manager_pid/0

-type connection_manager_pid() :: pid().

message_id/0

-type message_id() :: binary().

payload/0

-type payload() :: binary().

pending_pubacks/0

-type pending_pubacks() :: #{message_id() => {topic(), payload(), qos(), retry_count(), timer_ref()}}.

qos/0

-type qos() :: 0 | 1.

retry_count/0

-type retry_count() :: non_neg_integer().

timer_ref/0

-type timer_ref() :: reference().

topic/0

-type topic() :: binary().

Functions

get_pending(PendingPubacks)

-spec get_pending(pending_pubacks()) -> [message_id()].

Get list of pending message IDs (for testing/debugging).

handle_ack(MsgId, PendingPubacks)

-spec handle_ack(message_id(), pending_pubacks()) -> pending_pubacks().

Handle acknowledgment for a message. Cancels timer and removes message from pending map. Returns updated pending_pubacks map.

handle_timeout(MsgId, ConnMgrPid, PendingPubacks)

-spec handle_timeout(message_id(), connection_manager_pid(), pending_pubacks()) ->
                        {retry, pending_pubacks(), map()} |
                        {give_up, pending_pubacks()} |
                        {not_found, pending_pubacks()}.

Handle timeout for a pending message. Retries sending if under max retries, otherwise gives up. Returns {retry, UpdatedPending, PublishMsg} | {give_up, UpdatedPending}.

track_message(MsgId, Topic, Payload, Qos, PendingPubacks)

-spec track_message(message_id(), topic(), payload(), qos(), pending_pubacks()) ->
                       {ok, pending_pubacks()} | {error, term()}.

Track a message for QoS 1 acknowledgment. Starts a timeout timer and stores message in pending map. Returns updated pending_pubacks map.