Guidance for AI coding assistants integrating MqttX into a project. Read this before suggesting code that uses this library — it captures the mental model and the mistakes agents most often make.
Modifying MqttX itself? See
CONTRIBUTING.mdfor repo layout, test commands, and deferred work.
What MqttX is
A single hex package ({:mqttx, "~> 0.10"}) that ships three independent
pieces — choose only what you need:
| Piece | Module | Use when |
|---|---|---|
| Wire codec | MqttX.Packet.Codec | You have your own transport and just need encode/decode |
| Client | MqttX.Client | Your app connects to an MQTT broker (AWS IoT, EMQX, HiveMQ, Mosquitto, …) |
| Broker | MqttX.Server | You are running an MQTT broker (e.g. an IoT backend that owns its devices) |
Most apps want only the client. Build a broker only when you need to own the message routing — for talking to a third-party broker, the client is sufficient on its own.
Picking a transport
The codec is dep-free; transports are optional packages:
| Transport | Add to deps |
|---|---|
TCP / TLS client (tcp / ssl) | nothing extra |
WebSocket client (ws / wss) | nothing extra (RFC 6455 client is built-in) |
| TCP server | {:thousand_island, "~> 1.4"} (preferred) or {:ranch, "~> 2.2"} |
| WebSocket server | {:bandit, "~> 1.6"} + {:websock_adapter, "~> 0.5"} |
If MqttX.Transport.ThousandIsland (or Ranch, or Bandit) fails at server
startup with an undefined-module / undefined-function error, the
corresponding optional dep is missing from mix.exs — that's the single most
common setup mistake.
Mental model — client side
your code ──MqttX.Client.subscribe──▶ broker
your code ──MqttX.Client.publish───▶ broker
│
MqttX.Client ◀─PUBLISH──────┘
│
▼
handler_module.handle_mqtt_event(:message, {topic, payload, packet}, state)- The client is a GenServer. You don't poll it — it pushes events to a handler module.
MqttX.Client.connect/1blocks until CONNACK arrives, so once it returns{:ok, pid}the session is live and you can immediately subscribe/publish.subscribe/3is synchronous and waits for SUBACK before returning{:ok, granted_qos_list}.publish/4returns:okas soon as the packet is written to the socket (it does not wait for PUBACK at QoS 1/2 — those acks are tracked in the background and surfaced via the handler module).- If the connection has dropped (and not yet reconnected),
subscribe,publish, andunsubscribereturn{:error, :not_connected}immediately — they do not queue. - The handler module implements
handle_mqtt_event/3, which receives:(:connected, %{properties: props, session_present: bool}, state)— after CONNACK success(:disconnected, reason, state)—reasonis:closed,:pingresp_timeout,{:error, posix}, or{:server_disconnect, code, props}(:message, {topic, payload, full_packet}, state)— for each PUBLISH
topic arrives as a list of segments (["sensors", "room1", "temp"]),
not the original string — use Enum.join(topic, "/") if you need to round-trip.
Mental model — broker side
use MqttX.Server defines a behaviour with one callback per MQTT verb:
device ──CONNECT──▶ handle_connect(client_id, creds, info, state)
device ──SUBSCRIBE─▶ handle_subscribe(topics, state) → grant per-topic QoS
device ──PUBLISH──▶ handle_publish(topic, payload, opts, state)
device ──DISCONNECT▶ handle_disconnect(reason, state)
your app ──send(broker_pid, msg)─▶ handle_info(msg, state)
└─▶ {:publish, topic, payload, state} (fan out to device)Servers are per-connection state machines — state is one device's state.
For app-wide state (subscriber registry, message bus), use Phoenix.PubSub or
:pg from inside the callbacks.
Idiomatic patterns
Receive messages on the client
defmodule MyApp.MqttHandler do
def handle_mqtt_event(:connected, _info, state), do: state
def handle_mqtt_event(:disconnected, _reason, state), do: state
def handle_mqtt_event(:message, {topic, payload, _packet}, state) do
Logger.info("got #{payload} on #{Enum.join(topic, "/")}")
state
end
end
{:ok, c} = MqttX.Client.connect(
host: "broker.example.com",
client_id: "my-app-#{node()}",
handler: MyApp.MqttHandler,
handler_state: %{}
)
{:ok, _granted} = MqttX.Client.subscribe(c, "sensors/#", qos: 1)Bridge MQTT broker ↔ Phoenix.PubSub (fan-out)
defmodule MyBroker do
use MqttX.Server
def init(_), do: %{}
def handle_connect(client_id, _creds, _info, state) do
Phoenix.PubSub.subscribe(MyApp.PubSub, "downlink:#{client_id}")
{:ok, Map.put(state, :client_id, client_id)}
end
def handle_publish(topic, payload, _opts, state) do
Phoenix.PubSub.broadcast(MyApp.PubSub, "uplink", {state.client_id, topic, payload})
{:ok, state}
end
def handle_info({:downlink, topic, payload}, state) do
{:publish, topic, payload, %{qos: 1, retain: false}, state}
end
def handle_subscribe(topics, s), do: {:ok, Enum.map(topics, & &1.qos), s}
def handle_disconnect(_r, _s), do: :ok
end
# elsewhere in your app:
Phoenix.PubSub.broadcast(MyApp.PubSub, "downlink:device-123",
{:downlink, "device-123/cmd", "reboot"})MQTT 5 persistent sessions (resume QoS 1/2 across reconnects)
MqttX.Client.connect(
host: "broker.example.com",
client_id: "stable-id-not-uuid", # MUST be stable across reconnects
protocol_version: 5, # required for properties
clean_session: false,
connect_properties: %{session_expiry_interval: 3600},
session_store: MqttX.Session.ETSStore # client-side persistence
)Custom auth (reject CONNECT)
def handle_connect(client_id, %{username: u, password: p}, _info, state) do
case MyApp.Auth.verify(u, p) do
{:ok, _} -> {:ok, state}
:error -> {:error, 0x86, state} # 0x86 = Bad User Name or Password
end
endReason codes worth knowing: 0x80 Unspecified, 0x86 Bad credentials,
0x87 Not authorized, 0x95 Packet too large, 0x97 Quota exceeded.
Full list in MQTT 5.0 §2.4.
Common mistakes (do not do these)
- Wildcards in PUBLISH.
+and#are subscribe-only — publishing them is a Protocol Error and the broker will disconnect. Validate withMqttX.Topic.validate_publish/1for any topic that mixes user input. - Using
handle_publish/4on the client. That's a server callback. The client receives PUBLISHes viahandle_mqtt_event(:message, …). They are not the same callback — agents confuse this constantly. clean_session: falsewithout:session_store. The flag tells the broker to keep state. For the client to remember in-flight QoS 1/2 across reconnects, also pass:session_store.- Random
client_idper connect. Sessions, retained messages, and shared subscriptions are all keyed byclient_id.UUID.uuid4()per connect silently breaks all three. - Picking QoS 2 by default. QoS 2 is a 4-packet handshake (PUBLISH → PUBREC → PUBREL → PUBCOMP) — use it only when duplicate delivery would cause real harm (financial transactions). For telemetry use QoS 0; for commands use QoS 1.
- Expecting
#to match$SYS/.... Per MQTT §4.7.2,$-prefixed topics require explicit subscription.subscribe(c, "#")does not receive$SYS/broker/uptime. - Treating
MqttX.Server.Routeras a public pubsub. It is the broker's internal subscription index. To send messages between processes, use Phoenix.PubSub or:pg, then bridge via the broker callback. - Setting
keepalivehigher than the cloud-proxy idle timeout. Fly.io, AWS NLB, Azure Front Door all idle TCP at ~60s. Use ≤ 30s for cellular IoT or setserver_keep_alive: 30intransport_optsto enforce it server-side for v5 clients. - Assuming retained = "all past messages". Retain stores the last message per topic only — it's a "current state" mechanism, not a history.
- Ignoring CONNACK reason codes.
MqttX.Client.connect/1returns{:ok, pid}only on success; on broker rejection it returns{:error, {:connack_error, reason_code, %{server_reference: ref_or_nil}}}(e.g.0x84for unsupported version,0x86for bad credentials,0x9Cto use the included server reference for redirect).
Decision helpers
- Topic structure: prefer hierarchy that matches your subscription
patterns.
tenant/{id}/device/{id}/telemetry/{metric}letstenant/+/device/+/telemetry/#fan out cleanly. Avoid encoding multiple dimensions into one segment. - Payload format: Protobuf for cellular IoT (5-10× smaller than JSON); JSON for backend interop where payload size doesn't dominate.
max_inflighton the client: default 100. Raise only if your broker advertises a highreceive_maximumand you're seeing throughput limits; otherwise increasing it just delays backpressure.- Shared subscriptions (
$share/group/topic): use to load-balance consumers, not to broadcast. Each message goes to exactly one subscriber in the group. Subscribing to the same topic from N clients without$sharedelivers N copies.
Where to find authoritative answers
- Public API: hexdocs.pm/mqttx —
@docstrings on every public function - Worked examples:
README.md("Common Patterns") and the integration tests attest/mqttx/integration_test.exsandtest/mqttx/interop_emqx_test.exs - Recent behavior changes:
CHANGELOG.md— the[0.10.0]entry documents the v0.10.0 spec sweep, which tightened many edge cases that older examples on the internet may not reflect - MQTT spec: OASIS 3.1.1 /
5.0 — section references in
this codebase (e.g.
§3.3.1.2) point here