Guidance for AI coding assistants integrating MqttX into a project. Read this before suggesting code that uses this library — it captures the mental model and the mistakes agents most often make.

Modifying MqttX itself? See CONTRIBUTING.md for repo layout, test commands, and deferred work.

What MqttX is

A single hex package ({:mqttx, "~> 0.10"}) that ships three independent pieces — choose only what you need:

PieceModuleUse when
Wire codecMqttX.Packet.CodecYou have your own transport and just need encode/decode
ClientMqttX.ClientYour app connects to an MQTT broker (AWS IoT, EMQX, HiveMQ, Mosquitto, …)
BrokerMqttX.ServerYou are running an MQTT broker (e.g. an IoT backend that owns its devices)

Most apps want only the client. Build a broker only when you need to own the message routing — for talking to a third-party broker, the client is sufficient on its own.

Picking a transport

The codec is dep-free; transports are optional packages:

TransportAdd to deps
TCP / TLS client (tcp / ssl)nothing extra
WebSocket client (ws / wss)nothing extra (RFC 6455 client is built-in)
TCP server{:thousand_island, "~> 1.4"} (preferred) or {:ranch, "~> 2.2"}
WebSocket server{:bandit, "~> 1.6"} + {:websock_adapter, "~> 0.5"}

If MqttX.Transport.ThousandIsland (or Ranch, or Bandit) fails at server startup with an undefined-module / undefined-function error, the corresponding optional dep is missing from mix.exs — that's the single most common setup mistake.

Mental model — client side

your code  MqttX.Client.subscribe  broker
your code  MqttX.Client.publish  broker
                                          
              MqttX.Client PUBLISH
                   
                   
       handler_module.handle_mqtt_event(:message, {topic, payload, packet}, state)
  • The client is a GenServer. You don't poll it — it pushes events to a handler module.
  • MqttX.Client.connect/1 blocks until CONNACK arrives, so once it returns {:ok, pid} the session is live and you can immediately subscribe/publish.
  • subscribe/3 is synchronous and waits for SUBACK before returning {:ok, granted_qos_list}. publish/4 returns :ok as soon as the packet is written to the socket (it does not wait for PUBACK at QoS 1/2 — those acks are tracked in the background and surfaced via the handler module).
  • If the connection has dropped (and not yet reconnected), subscribe, publish, and unsubscribe return {:error, :not_connected} immediately — they do not queue.
  • The handler module implements handle_mqtt_event/3, which receives:
    • (:connected, %{properties: props, session_present: bool}, state) — after CONNACK success
    • (:disconnected, reason, state)reason is :closed, :pingresp_timeout, {:error, posix}, or {:server_disconnect, code, props}
    • (:message, {topic, payload, full_packet}, state) — for each PUBLISH

topic arrives as a list of segments (["sensors", "room1", "temp"]), not the original string — use Enum.join(topic, "/") if you need to round-trip.

Mental model — broker side

use MqttX.Server defines a behaviour with one callback per MQTT verb:

device  CONNECT  handle_connect(client_id, creds, info, state)
device  SUBSCRIBE  handle_subscribe(topics, state)          grant per-topic QoS
device  PUBLISH  handle_publish(topic, payload, opts, state)
device  DISCONNECT  handle_disconnect(reason, state)

your app send(broker_pid, msg) handle_info(msg, state)
                                        {:publish, topic, payload, state}  (fan out to device)

Servers are per-connection state machines — state is one device's state. For app-wide state (subscriber registry, message bus), use Phoenix.PubSub or :pg from inside the callbacks.

Idiomatic patterns

Receive messages on the client

defmodule MyApp.MqttHandler do
  def handle_mqtt_event(:connected, _info, state), do: state
  def handle_mqtt_event(:disconnected, _reason, state), do: state

  def handle_mqtt_event(:message, {topic, payload, _packet}, state) do
    Logger.info("got #{payload} on #{Enum.join(topic, "/")}")
    state
  end
end

{:ok, c} = MqttX.Client.connect(
  host: "broker.example.com",
  client_id: "my-app-#{node()}",
  handler: MyApp.MqttHandler,
  handler_state: %{}
)

{:ok, _granted} = MqttX.Client.subscribe(c, "sensors/#", qos: 1)

Bridge MQTT broker ↔ Phoenix.PubSub (fan-out)

defmodule MyBroker do
  use MqttX.Server

  def init(_), do: %{}

  def handle_connect(client_id, _creds, _info, state) do
    Phoenix.PubSub.subscribe(MyApp.PubSub, "downlink:#{client_id}")
    {:ok, Map.put(state, :client_id, client_id)}
  end

  def handle_publish(topic, payload, _opts, state) do
    Phoenix.PubSub.broadcast(MyApp.PubSub, "uplink", {state.client_id, topic, payload})
    {:ok, state}
  end

  def handle_info({:downlink, topic, payload}, state) do
    {:publish, topic, payload, %{qos: 1, retain: false}, state}
  end

  def handle_subscribe(topics, s), do: {:ok, Enum.map(topics, & &1.qos), s}
  def handle_disconnect(_r, _s), do: :ok
end

# elsewhere in your app:
Phoenix.PubSub.broadcast(MyApp.PubSub, "downlink:device-123",
  {:downlink, "device-123/cmd", "reboot"})

MQTT 5 persistent sessions (resume QoS 1/2 across reconnects)

MqttX.Client.connect(
  host: "broker.example.com",
  client_id: "stable-id-not-uuid",                    # MUST be stable across reconnects
  protocol_version: 5,                                # required for properties
  clean_session: false,
  connect_properties: %{session_expiry_interval: 3600},
  session_store: MqttX.Session.ETSStore               # client-side persistence
)

Custom auth (reject CONNECT)

def handle_connect(client_id, %{username: u, password: p}, _info, state) do
  case MyApp.Auth.verify(u, p) do
    {:ok, _} -> {:ok, state}
    :error   -> {:error, 0x86, state}    # 0x86 = Bad User Name or Password
  end
end

Reason codes worth knowing: 0x80 Unspecified, 0x86 Bad credentials, 0x87 Not authorized, 0x95 Packet too large, 0x97 Quota exceeded. Full list in MQTT 5.0 §2.4.

Common mistakes (do not do these)

  • Wildcards in PUBLISH. + and # are subscribe-only — publishing them is a Protocol Error and the broker will disconnect. Validate with MqttX.Topic.validate_publish/1 for any topic that mixes user input.
  • Using handle_publish/4 on the client. That's a server callback. The client receives PUBLISHes via handle_mqtt_event(:message, …). They are not the same callback — agents confuse this constantly.
  • clean_session: false without :session_store. The flag tells the broker to keep state. For the client to remember in-flight QoS 1/2 across reconnects, also pass :session_store.
  • Random client_id per connect. Sessions, retained messages, and shared subscriptions are all keyed by client_id. UUID.uuid4() per connect silently breaks all three.
  • Picking QoS 2 by default. QoS 2 is a 4-packet handshake (PUBLISH → PUBREC → PUBREL → PUBCOMP) — use it only when duplicate delivery would cause real harm (financial transactions). For telemetry use QoS 0; for commands use QoS 1.
  • Expecting # to match $SYS/.... Per MQTT §4.7.2, $-prefixed topics require explicit subscription. subscribe(c, "#") does not receive $SYS/broker/uptime.
  • Treating MqttX.Server.Router as a public pubsub. It is the broker's internal subscription index. To send messages between processes, use Phoenix.PubSub or :pg, then bridge via the broker callback.
  • Setting keepalive higher than the cloud-proxy idle timeout. Fly.io, AWS NLB, Azure Front Door all idle TCP at ~60s. Use ≤ 30s for cellular IoT or set server_keep_alive: 30 in transport_opts to enforce it server-side for v5 clients.
  • Assuming retained = "all past messages". Retain stores the last message per topic only — it's a "current state" mechanism, not a history.
  • Ignoring CONNACK reason codes. MqttX.Client.connect/1 returns {:ok, pid} only on success; on broker rejection it returns {:error, {:connack_error, reason_code, %{server_reference: ref_or_nil}}} (e.g. 0x84 for unsupported version, 0x86 for bad credentials, 0x9C to use the included server reference for redirect).

Decision helpers

  • Topic structure: prefer hierarchy that matches your subscription patterns. tenant/{id}/device/{id}/telemetry/{metric} lets tenant/+/device/+/telemetry/# fan out cleanly. Avoid encoding multiple dimensions into one segment.
  • Payload format: Protobuf for cellular IoT (5-10× smaller than JSON); JSON for backend interop where payload size doesn't dominate.
  • max_inflight on the client: default 100. Raise only if your broker advertises a high receive_maximum and you're seeing throughput limits; otherwise increasing it just delays backpressure.
  • Shared subscriptions ($share/group/topic): use to load-balance consumers, not to broadcast. Each message goes to exactly one subscriber in the group. Subscribing to the same topic from N clients without $share delivers N copies.

Where to find authoritative answers

  • Public API: hexdocs.pm/mqttx@doc strings on every public function
  • Worked examples: README.md ("Common Patterns") and the integration tests at test/mqttx/integration_test.exs and test/mqttx/interop_emqx_test.exs
  • Recent behavior changes: CHANGELOG.md — the [0.10.0] entry documents the v0.10.0 spec sweep, which tightened many edge cases that older examples on the internet may not reflect
  • MQTT spec: OASIS 3.1.1 / 5.0 — section references in this codebase (e.g. §3.3.1.2) point here