Topic router for MQTT servers.
The router maintains a trie-based subscription table and provides efficient topic matching for message routing. Matching is O(L + K) where L is the topic depth and K is the number of matching subscriptions, compared to the previous O(N) linear scan across all subscriptions.
Usage
router = MqttX.Server.Router.new()
router = MqttX.Server.Router.subscribe(router, "sensors/+/temp", client_ref, qos: 1)
router = MqttX.Server.Router.subscribe(router, "alerts/#", client_ref, qos: 0)
matches = MqttX.Server.Router.match(router, "sensors/room1/temp")
# => [{client_ref, %{qos: 1}}]Shared Subscriptions (MQTT 5.0)
Shared subscriptions allow load balancing messages across multiple clients.
Use the $share/group_name/topic_filter format:
router = MqttX.Server.Router.subscribe(router, "$share/workers/jobs/#", client1, qos: 1)
router = MqttX.Server.Router.subscribe(router, "$share/workers/jobs/#", client2, qos: 1)
# Messages to "jobs/task1" are delivered to client1 or client2 (round-robin)
matches = MqttX.Server.Router.match(router, "jobs/task1")
# => [{client1, %{qos: 1}}] or [{client2, %{qos: 1}}]
Summary
Functions
Get the number of unique clients with subscriptions.
Get the total number of subscriptions.
Find all matching subscriptions for a topic.
Find all matching subscriptions and advance round-robin for shared groups.
Create a new empty router.
Add a subscription to the router.
Get all subscriptions for a client.
Remove a subscription from the router.
Remove all subscriptions for a client.
Types
@type subscription() :: %{ filter: MqttX.Topic.normalized_topic(), client: term(), qos: 0 | 1 | 2, opts: map() }
@type t() :: %MqttX.Server.Router{ by_client: %{ required(term()) => [ {:normal, MqttX.Topic.normalized_topic()} | {:shared, binary(), MqttX.Topic.normalized_topic()} ] }, count: non_neg_integer(), shared_groups: %{required(binary()) => shared_group()}, trie: map() }
Functions
@spec client_count(t()) :: non_neg_integer()
Get the number of unique clients with subscriptions.
@spec count(t()) :: non_neg_integer()
Get the total number of subscriptions.
Find all matching subscriptions for a topic.
Returns a list of {client, opts} tuples for each matching subscription.
For shared subscriptions, only one client per group is selected (round-robin).
The optional publisher parameter enables no_local filtering: subscriptions
with no_local: true are excluded when the publisher matches the subscriber.
@spec match_and_advance(t(), binary() | MqttX.Topic.normalized_topic(), term()) :: {[{term(), map()}], t()}
Find all matching subscriptions and advance round-robin for shared groups.
This is the same as match/3 but also updates the router state
to advance the round-robin index for matched shared subscriptions.
Returns {matches, updated_router}.
@spec new() :: t()
Create a new empty router.
Add a subscription to the router.
Options
:qos- Maximum QoS level (default: 0)- Any additional options are stored with the subscription
Shared Subscriptions
Use $share/group_name/topic_filter format for shared subscriptions:
router = MqttX.Server.Router.subscribe(router, "$share/workers/jobs/#", client, qos: 1)
@spec subscriptions_for(t(), term()) :: [subscription()]
Get all subscriptions for a client.
@spec unsubscribe(t(), binary() | MqttX.Topic.normalized_topic(), term()) :: t()
Remove a subscription from the router.
Remove all subscriptions for a client.