Server Guide

View Source

MqttX.Server is a behaviour for building MQTT brokers. You implement handler callbacks; MqttX handles the protocol, transport, and routing.

Handler Callbacks

defmodule MyApp.MqttHandler do
  use MqttX.Server

  @impl true
  def init(_opts), do: %{subscriptions: %{}}

  @impl true
  def handle_connect(client_id, credentials, state) do
    IO.puts("Client connected: #{client_id}")
    {:ok, state}
  end

  @impl true
  def handle_publish(topic, payload, opts, state) do
    IO.puts("Received on #{inspect(topic)}: #{payload}")
    {:ok, state}
  end

  @impl true
  def handle_subscribe(topics, state) do
    qos_list = Enum.map(topics, fn t -> t.qos end)
    {:ok, qos_list, state}
  end

  @impl true
  def handle_unsubscribe(_topics, state), do: {:ok, state}

  @impl true
  def handle_disconnect(reason, _state) do
    IO.puts("Client disconnected: #{inspect(reason)}")
    :ok
  end

  @impl true
  def handle_info(message, state) do
    # Handle custom messages (e.g., Phoenix.PubSub broadcasts)
    {:ok, state}
  end
end

Callback Summary

CallbackReturn
init(opts)state
handle_connect(client_id, credentials, state){:ok, state} or {:error, reason_code, state}
handle_publish(topic, payload, opts, state){:ok, state}, {:disconnect, reason_code, state}
handle_subscribe(topics, state){:ok, granted_qos_list, state}, {:disconnect, reason_code, state}
handle_unsubscribe(topics, state){:ok, state}, {:disconnect, reason_code, state}
handle_disconnect(reason, state):ok
handle_info(message, state){:ok, state}, {:publish, ...}, {:disconnect, reason_code, state}, or {:stop, reason, state}
handle_session_expired(client_id, state):ok (optional)

Use handle_info/2 with {:publish, topic, payload, state} to push messages to connected clients from external events (e.g., PubSub).

Any callback that returns {:disconnect, reason_code, state} or {:disconnect, reason_code, properties, state} will send an MQTT 5.0 DISCONNECT packet to the client and close the connection.

Transport Adapters

# mix.exs: {:thousand_island, "~> 1.4"}

{:ok, _pid} = MqttX.Server.start_link(
  MyApp.MqttHandler,
  [],
  transport: MqttX.Transport.ThousandIsland,
  port: 1883
)

Ranch

# mix.exs: {:ranch, "~> 2.2"}

{:ok, _pid} = MqttX.Server.start_link(
  MyApp.MqttHandler,
  [],
  transport: MqttX.Transport.Ranch,
  port: 1883
)

WebSocket

# mix.exs: {:bandit, "~> 1.6"}, {:websock_adapter, "~> 0.5"}

{:ok, _pid} = MqttX.Server.start_link(
  MyApp.MqttHandler,
  [],
  transport: MqttX.Transport.WebSocket,
  port: 8083
)

All transports support TCP and TLS. See MqttX.Transport for implementing custom adapters.

Topic Routing

The built-in router uses a trie data structure for efficient topic matching — O(L+K) where L is the topic depth and K is matching subscriptions. It supports MQTT wildcard subscriptions (+ single-level, # multi-level):

alias MqttX.Server.Router

router = Router.new()
router = Router.subscribe(router, "sensors/+/temp", client_ref, qos: 1)
router = Router.subscribe(router, "alerts/#", client_ref, qos: 0)

matches = Router.match(router, "sensors/room1/temp")
# => [{client_ref, %{qos: 1}}]

Shared Subscriptions (MQTT 5.0)

Distribute messages across a group of subscribers with round-robin load balancing:

router = Router.subscribe(router, "$share/workers/jobs/#", worker1, qos: 1)
router = Router.subscribe(router, "$share/workers/jobs/#", worker2, qos: 1)

# Messages to "jobs/process" alternate between worker1 and worker2
{matches, router} = Router.match_and_advance(router, "jobs/process")

Rate Limiting

MqttX supports per-client connection and message rate limiting. Configure it via the :rate_limit option:

MqttX.Server.start_link(MyApp.MqttHandler, [],
  transport: MqttX.Transport.ThousandIsland,
  port: 1883,
  rate_limit: [
    max_connections: 100,    # new connections per second
    max_messages: 1000,      # messages per client per second
    interval: 1000           # window size in ms (default)
  ]
)
OptionDescriptionDefault
:max_connectionsMax new connections per intervalunlimited
:max_messagesMax messages per client per intervalunlimited
:intervalCounter reset interval in ms1000

When a client exceeds the message rate limit:

  • QoS 0: Messages are silently dropped (per MQTT spec)
  • QoS 1+: PUBACK is sent with reason code 0x96 (message_rate_too_high)

When the connection rate limit is exceeded, new connections are immediately closed.

Rate limiting uses ETS with atomic update_counter operations, making it lock-free and safe for concurrent access from multiple transport handler processes.

Retained Messages

The server automatically stores retained messages in ETS and delivers them to new subscribers. Publish with an empty payload to clear a retained message.

Keepalive Timeout

The server enforces MQTT keepalive as defined in the spec: if no packet is received from a client within 1.5x the keep_alive interval (set in the CONNECT packet), the server disconnects the client and publishes any will message.

The keepalive timer resets on every received packet (not just PINGREQ). Clients that set keep_alive: 0 are exempt from timeout enforcement.

When a keepalive timeout fires, your handle_disconnect/2 callback receives :keepalive_timeout as the reason.

Will Messages

Will messages from the CONNECT packet are published automatically when a client disconnects without sending DISCONNECT.

Will Delay Interval (MQTT 5.0)

MQTT 5.0 clients can set will_delay_interval in the will properties to delay publication of the will message:

  • will_delay_interval: 0 (or MQTT 3.1.1) — will published immediately (default)
  • will_delay_interval: N — will published N seconds after ungraceful disconnect

This allows a grace period for clients to reconnect before their "last will" is broadcast.

Session Expiry (MQTT 5.0)

MQTT 5.0 clients can set session_expiry_interval in the CONNECT properties. After the client disconnects, the server waits the specified interval then calls your handle_session_expired/2 callback:

@impl true
def handle_session_expired(client_id, state) do
  # Clean up stored subscriptions, queued messages, etc.
  MyApp.SessionStore.delete(client_id)
  :ok
end
ValueBehavior
nilNo session expiry (MQTT 3.1.1 default)
0Session expires immediately on disconnect
1..0xFFFFFFFESession expires after N seconds
0xFFFFFFFFSession never expires

Server-Initiated Disconnect

Kick a client from the server with an MQTT 5.0 reason code:

# From outside the handler (e.g., admin action)
MqttX.Server.disconnect(transport_pid, 0x98)
MqttX.Server.disconnect(transport_pid, 0x89, %{reason_string: "Session taken over"})

Or return {:disconnect, reason_code, state} from any handler callback:

@impl true
def handle_publish(topic, _payload, _opts, state) do
  if forbidden?(topic) do
    {:disconnect, 0x98, state}  # Use assigned client identifier
  else
    {:ok, state}
  end
end

DISCONNECT packets are only sent for MQTT 5.0 connections. For MQTT 3.1.1, the connection is simply closed.

MQTT 5.0 Protocol Features

The server automatically handles these MQTT 5.0 features when clients connect with protocol version 5:

Topic Aliases

Clients can use topic aliases to reduce bandwidth by replacing repeated topic strings with short integer aliases. The server:

  • Advertises topic_alias_maximum: 100 in CONNACK (configurable via transport_opts)
  • Resolves incoming topic aliases: first publish with alias + topic stores the mapping, subsequent publishes with alias only use the stored topic
  • No application code needed — alias resolution is transparent to your handler callbacks

Flow Control (Receive Maximum)

The server enforces receive_maximum for incoming QoS 2 messages:

  • Advertises receive_maximum in CONNACK (default: 65535, configurable via transport_opts)
  • Tracks in-flight QoS 2 messages (between PUBREC and PUBCOMP)
  • Rejects excess QoS 2 publishes with PUBREC reason code 0x93 (Receive Maximum exceeded)

Maximum Packet Size

Configure a maximum incoming packet size to protect against oversized messages:

MqttX.Server.start_link(MyApp.MqttHandler, [],
  transport: MqttX.Transport.ThousandIsland,
  port: 1883,
  transport_opts: %{max_packet_size: 1_048_576}  # 1MB limit
)
  • Server sends DISCONNECT with reason code 0x95 (Packet too large) for oversized incoming packets
  • Outgoing publishes exceeding the client's advertised maximum_packet_size are silently dropped
  • Server advertises its maximum_packet_size in CONNACK when configured

QoS 2 Retransmission

The server automatically retries stale QoS 2 handshake messages:

  • Re-sends PUBREC if no PUBREL received within the retry interval (default: 5 seconds)
  • Re-sends PUBLISH with dup: true for outgoing QoS 2 awaiting PUBREC
  • Re-sends PUBREL for outgoing QoS 2 awaiting PUBCOMP
  • Drops entries after max retries (default: 3)
  • Handles DUP incoming PUBLISH by re-sending PUBREC without re-storing

Shared Subscriptions

Distribute messages across a group of subscribers with $share/group/topic patterns. The server advertises shared_subscription_available: 1 in CONNACK. See the Topic Routing section above for usage.

CONNACK Properties

For MQTT 5.0 connections, the server automatically includes these properties in CONNACK:

PropertyValueDescription
shared_subscription_available1Shared subscriptions supported
topic_alias_maximum100Max topic aliases the server accepts
receive_maximum65535Max in-flight QoS 2 messages
maximum_packet_size(if configured)Max incoming packet size in bytes
retain_available1Retained messages supported
wildcard_subscription_available1Wildcard subscriptions supported
subscription_identifier_available0Subscription identifiers not supported

MQTT 5.0 Publish Properties

The opts map passed to handle_publish/4 includes a :properties key containing any MQTT 5.0 publish properties sent by the client (e.g., user_properties, content_type, correlation_data, response_topic, payload_format_indicator, message_expiry_interval). These properties are also forwarded when the server sends outgoing PUBLISH messages via handle_info/2.

Session Handling

The server operates in clean-session mode: session_present is always false in CONNACK. Session state (subscriptions, queued messages) is not persisted across reconnections. If your application requires session resumption, implement it at the handler level using handle_connect/3 and a session store.

Subscription Options (MQTT 5.0)

The server supports MQTT 5.0 subscription options:

  • no_local: When set to true, messages published by a client are not delivered back to that client's own matching subscriptions. Requires passing the publisher identity to Router.match/3.
  • retain_handling: Controls retained message delivery on subscribe:
    • 0 — Send retained messages (default)
    • 1 — Send retained messages if the subscription does not already exist
    • 2 — Do not send retained messages on subscribe