MqttX

Hex.pm Docs CI

MqttX

Fast, pure Elixir MQTT 5.0 — client, server, and codec in one package.

  • 🚀 High-performance packet codec
  • 🖥️ Transport-agnostic server/broker
  • 📡 Modern client with automatic reconnection
  • 🔌 Pluggable transports (ThousandIsland, Ranch)
  • 📦 Optional payload codecs (JSON, Protobuf)

MQTT for Cellular IoT

For IoT devices on cellular networks (LTE-M, NB-IoT), every byte matters. Data transmission costs money, drains batteries, and increases latency. MQTT combined with Protobuf dramatically outperforms WebSocket with JSON:

Protocol Overhead Comparison

MetricWebSocket + JSONMQTT + ProtobufSavings
Connection handshake~300-500 bytes~30-50 bytes90%
Per-message overhead6-14 bytes2-4 bytes70%
Keep-alive (ping)~6 bytes2 bytes67%

Real-World Payload Example

Sending a sensor reading {temperature: 25.5, humidity: 60, battery: 85}:

FormatSizeNotes
JSON52 bytes{"temperature":25.5,"humidity":60,"battery":85}
Protobuf9 bytesBinary: 0x08 0xCC 0x01 0x10 0x3C 0x18 0x55
Reduction83%5.8x smaller

Monthly Data Usage (1 device, 1 msg/min)

ProtocolPayloadMonthly Data
WebSocket + JSON52 bytes~2.2 MB
MQTT + Protobuf9 bytes~0.4 MB
Savings1.8 MB/device

For fleets of thousands of devices, this translates to significant cost savings on cellular data plans and extended battery life from reduced radio-on time.

MQTT vs WebSocket (Same JSON Payload)

Even when using JSON for both protocols, MQTT still provides significant overhead savings:

MetricWebSocket + JSONMQTT + JSONSavings
Connection handshake~300-500 bytes~30-50 bytes90%
Per-message overhead6-14 bytes2-4 bytes70%
Keep-alive (ping)~6 bytes2 bytes67%
52-byte JSON message58-66 bytes total54-56 bytes total15-18%

Key insight: MQTT's binary protocol has lower framing overhead than WebSocket's text-based frames. For high-frequency IoT messages, this adds up significantly.

Why MqttX?

Existing Elixir/Erlang MQTT libraries have limitations:

  • mqtt_packet_map: Erlang-only codec, no server/client, slower encoding
  • Tortoise/Tortoise311: Client-only, complex supervision, dated architecture
  • emqtt: Erlang-focused, heavy dependencies

MqttX provides a unified, pure Elixir solution with:

  • 2.9-4.2x faster encoding than mqtt_packet_map for common packets
  • Modern GenServer-based client with exponential backoff reconnection
  • Transport-agnostic server that works with ThousandIsland or Ranch
  • Clean, composable API designed for IoT and real-time applications
  • Zero external dependencies for the core codec

The codec has been tested for interoperability with:

  • Zephyr RTOS MQTT client (Nordic nRF9160, nRF52)
  • Eclipse Paho clients (C, Python, JavaScript)
  • Mosquitto broker
  • Standard MQTT test suites

Installation

Add mqttx to your dependencies:

def deps do
  [
    {:mqttx, "~> 0.7.0"},
    # Optional: Pick a transport
    {:thousand_island, "~> 1.4"},  # or {:ranch, "~> 2.2"}
    # Optional: WebSocket transport
    {:bandit, "~> 1.6"},
    {:websock_adapter, "~> 0.5"},
    # Optional: Payload codecs
    {:protox, "~> 2.0"}
  ]
end

Quick Start

MQTT Server

Create a handler module:

defmodule MyApp.MqttHandler do
  use MqttX.Server

  @impl true
  def init(_opts) do
    %{subscriptions: %{}}
  end

  @impl true
  def handle_connect(client_id, credentials, state) do
    IO.puts("Client connected: #{client_id}")
    {:ok, state}
  end

  @impl true
  def handle_publish(topic, payload, opts, state) do
    IO.puts("Received on #{inspect(topic)}: #{payload}")
    {:ok, state}
  end

  @impl true
  def handle_subscribe(topics, state) do
    qos_list = Enum.map(topics, fn t -> t.qos end)
    {:ok, qos_list, state}
  end

  @impl true
  def handle_disconnect(reason, _state) do
    IO.puts("Client disconnected: #{inspect(reason)}")
    :ok
  end
end

Start the server:

{:ok, _pid} = MqttX.Server.start_link(
  MyApp.MqttHandler,
  [],
  transport: MqttX.Transport.ThousandIsland,
  port: 1883
)

MQTT Client

# Connect with TCP (default)
{:ok, client} = MqttX.Client.connect(
  host: "localhost",
  port: 1883,
  client_id: "my_client",
  username: "user",        # optional
  password: "secret"       # optional
)

# Subscribe
:ok = MqttX.Client.subscribe(client, "sensors/#", qos: 1)

# Publish
:ok = MqttX.Client.publish(client, "sensors/temp", "25.5")

# Disconnect
:ok = MqttX.Client.disconnect(client)

TLS/SSL Connection

# Connect with TLS
{:ok, client} = MqttX.Client.connect(
  host: "broker.example.com",
  port: 8883,                    # default SSL port
  client_id: "secure_client",
  transport: :ssl,
  ssl_opts: [
    verify: :verify_peer,
    cacerts: :public_key.cacerts_get(),
    server_name_indication: ~c"broker.example.com"
  ]
)

Session Persistence

# Enable session persistence for QoS 1/2 message reliability
{:ok, client} = MqttX.Client.connect(
  host: "localhost",
  client_id: "persistent_client",
  clean_session: false,          # maintain session across reconnects
  session_store: MqttX.Session.ETSStore  # built-in ETS store
)

Packet Codec (Standalone)

# Encode a packet
packet = %{
  type: :publish,
  topic: "test/topic",
  payload: "hello",
  qos: 0,
  retain: false
}
{:ok, binary} = MqttX.Packet.Codec.encode(4, packet)

# Decode a packet
{:ok, {decoded, rest}} = MqttX.Packet.Codec.decode(4, binary)

Transport Adapters

MqttX supports pluggable transports:

MqttX.Server.start_link(
  MyHandler,
  [],
  transport: MqttX.Transport.ThousandIsland,
  port: 1883
)

Ranch

MqttX.Server.start_link(
  MyHandler,
  [],
  transport: MqttX.Transport.Ranch,
  port: 1883
)

WebSocket

MqttX.Server.start_link(
  MyHandler,
  [],
  transport: MqttX.Transport.WebSocket,
  port: 8083
)

Payload Codecs

Built-in payload codecs for message encoding/decoding:

JSON (Erlang/OTP 27+)

Uses the built-in Erlang JSON module:

{:ok, json} = MqttX.Payload.JSON.encode(%{temp: 25.5})
{:ok, data} = MqttX.Payload.JSON.decode(json)

Protobuf

{:ok, binary} = MqttX.Payload.Protobuf.encode(my_proto_struct)
{:ok, struct} = MqttX.Payload.Protobuf.decode(binary, MyProto.Message)

Raw (Pass-through)

{:ok, binary} = MqttX.Payload.Raw.encode(<<1, 2, 3>>)
{:ok, binary} = MqttX.Payload.Raw.decode(<<1, 2, 3>>)

Topic Routing

The server includes a topic router with wildcard support:

alias MqttX.Server.Router

router = Router.new()
router = Router.subscribe(router, "sensors/+/temp", client_ref, qos: 1)
router = Router.subscribe(router, "alerts/#", client_ref, qos: 0)

# Find matching subscriptions
matches = Router.match(router, "sensors/room1/temp")
# => [{client_ref, %{qos: 1}}]

Protocol Support

  • MQTT 3.1 (protocol version 3)
  • MQTT 3.1.1 (protocol version 4)
  • MQTT 5.0 (protocol version 5)

All 15 packet types are supported:

  • CONNECT, CONNACK
  • PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP
  • SUBSCRIBE, SUBACK
  • UNSUBSCRIBE, UNSUBACK
  • PINGREQ, PINGRESP
  • DISCONNECT
  • AUTH (MQTT 5.0)

Compliance

Fully compliant with MQTT 3.1, 3.1.1, and 5.0 specifications. The server advertises all capability properties in CONNACK, enforces protocol ordering, validates topic aliases, forwards MQTT 5.0 properties, and supports subscription options (no_local, retain_handling). Validated against Mosquitto clients across both TCP and WebSocket transports with 100+ automated protocol tests.

Performance

Architected to scale from tens of thousands to hundreds of thousands of concurrent devices on a single BEAM node, depending on hardware and workload. Each connection is a lightweight Erlang process (~20KB total with connection state and socket), and the hot paths are optimized for high message throughput:

  • Trie-based topic router: O(L+K) matching where L = topic depth, K = matching subscriptions — independent of total subscription count
  • iodata encoding: Socket sends use iodata directly, avoiding binary copies on every packet
  • Zero-copy binary references: Decoder returns sub-binaries for payload and topic
  • Empty-buffer fast path: Skips binary concatenation when the TCP buffer is empty (common case)
  • Cached callback dispatch: function_exported? computed once at connection init, not per message
  • Direct inflight counter: O(1) flow control check instead of scanning pending_acks
  • ETS-optimized retained delivery: O(1) lookup for exact topic subscriptions
MetricConservativeOptimistic
Concurrent connections50,000200,000
Messages/second (QoS 0)100,000500,000+
Messages/second (QoS 1)50,000200,000
Memory per connection~20 KB~20 KB

Codec benchmarks vs mqtt_packet_map (Apple M4 Pro):

OperationMqttXmqtt_packet_mapResult
PUBLISH encode5.05M ips1.72M ips2.9x faster
SUBSCRIBE encode3.42M ips0.82M ips4.2x faster
PUBLISH decode2.36M ips2.25M ips~same

See the Performance & Scaling guide for VM tuning, OS tuning, and deployment recommendations.

API Reference

MqttX.Client

FunctionDescription
connect(opts)Connect to an MQTT broker
connect_supervised(opts)Connect under MqttX.Client.Supervisor with crash recovery
list()List all registered client connections
whereis(client_id)Look up a connection by client_id
publish(client, topic, payload, opts \\ [])Publish a message. Options: :qos (0-2), :retain (boolean)
subscribe(client, topics, opts \\ [])Subscribe to topics. Options: :qos (0-2)
unsubscribe(client, topics)Unsubscribe from topics
disconnect(client)Disconnect from the broker
connected?(client)Check if client is connected

Connect Options:

OptionDescriptionDefault
:hostBroker hostname (required)-
:portBroker port1883 (TCP), 8883 (SSL)
:client_idClient identifier (required)-
:usernameAuthentication usernamenil
:passwordAuthentication passwordnil
:clean_sessionStart fresh sessiontrue
:keepaliveKeep-alive interval (seconds)60
:transport:tcp or :ssl:tcp
:ssl_optsSSL options for :ssl transport[]
:retry_intervalQoS retry interval (ms)5000
:session_storeSession store modulenil
:handlerCallback module for messagesnil
:handler_stateInitial handler statenil

MqttX.Server

FunctionDescription
start_link(handler, handler_opts, opts)Start an MQTT server. Options: :transport, :port, :name, :rate_limit

Callbacks:

CallbackDescription
init(opts)Initialize handler state
handle_connect(client_id, credentials, state)Handle client connection. Return {:ok, state} or {:error, reason_code, state}
handle_publish(topic, payload, opts, state)Handle incoming PUBLISH. Return {:ok, state}
handle_subscribe(topics, state)Handle SUBSCRIBE. Return {:ok, granted_qos_list, state}
handle_unsubscribe(topics, state)Handle UNSUBSCRIBE. Return {:ok, state}
handle_disconnect(reason, state)Handle client disconnection. Return :ok
handle_info(message, state)Handle custom messages. Return {:ok, state}, {:publish, topic, payload, state}, or {:stop, reason, state}

MqttX.Packet.Codec

FunctionDescription
encode(version, packet)Encode a packet to binary. Returns {:ok, binary}
decode(version, binary)Decode a packet from binary. Returns {:ok, {packet, rest}} or {:error, reason}
encode_iodata(version, packet)Encode to iodata (more efficient). Returns {:ok, iodata}

MqttX.Server.Router

FunctionDescription
new()Create a new empty router
subscribe(router, filter, client, opts)Add a subscription. Options: :qos
unsubscribe(router, filter, client)Remove a subscription
unsubscribe_all(router, client)Remove all subscriptions for a client
match(router, topic)Find matching subscriptions. Returns [{client, opts}]

MqttX.Topic

FunctionDescription
validate(topic)Validate and normalize a topic. Returns {:ok, normalized} or {:error, :invalid_topic}
validate_publish(topic)Validate topic for publishing (no wildcards)
matches?(filter, topic)Check if a filter matches a topic
normalize(topic)Normalize topic to list format
flatten(normalized)Convert normalized topic back to binary string
wildcard?(topic)Check if topic contains wildcards

Roadmap

FeatureStatusDescription
Full MQTT 5.0 ComplianceDonePre-CONNECT rejection, topic alias validation, property forwarding, CONNACK capabilities, retain_handling, no_local
WebSocket TransportDoneMQTT over WebSocket via Bandit (ws:// and wss://)
Mosquitto ValidationDone104 automated protocol tests across TCP and WebSocket
ClusteringPlannedDistributed router across Erlang nodes via pg
Property-based TestsPlannedStreamData for fuzzing the packet codec
End-to-end Load TestsPlannedBenchee-based throughput validation under realistic workloads

License

Apache-2.0