Hex.pm Docs CI

A pure Elixir MQTT 3.1.1/5.0 library featuring:

  • 🚀 High-performance packet codec
  • 🖥️ Transport-agnostic server/broker
  • 📡 Modern client with automatic reconnection
  • 🔌 Pluggable transports (ThousandIsland, Ranch)
  • 📦 Optional payload codecs (JSON, Protobuf)

MQTT for Cellular IoT

For IoT devices on cellular networks (LTE-M, NB-IoT), every byte matters. Data transmission costs money, drains batteries, and increases latency. MQTT combined with Protobuf dramatically outperforms WebSocket with JSON:

Protocol Overhead Comparison

MetricWebSocket + JSONMQTT + ProtobufSavings
Connection handshake~300-500 bytes~30-50 bytes90%
Per-message overhead6-14 bytes2-4 bytes70%
Keep-alive (ping)~6 bytes2 bytes67%

Real-World Payload Example

Sending a sensor reading {temperature: 25.5, humidity: 60, battery: 85}:

FormatSizeNotes
JSON52 bytes{"temperature":25.5,"humidity":60,"battery":85}
Protobuf9 bytesBinary: 0x08 0xCC 0x01 0x10 0x3C 0x18 0x55
Reduction83%5.8x smaller

Monthly Data Usage (1 device, 1 msg/min)

ProtocolPayloadMonthly Data
WebSocket + JSON52 bytes~2.2 MB
MQTT + Protobuf9 bytes~0.4 MB
Savings1.8 MB/device

For fleets of thousands of devices, this translates to significant cost savings on cellular data plans and extended battery life from reduced radio-on time.

Why MqttX?

Existing Elixir/Erlang MQTT libraries have limitations:

  • mqtt_packet_map: Erlang-only codec, no server/client, slower encoding
  • Tortoise/Tortoise311: Client-only, complex supervision, dated architecture
  • emqtt: Erlang-focused, heavy dependencies

MqttX provides a unified, pure Elixir solution with:

  • 2.9-4.2x faster encoding than mqtt_packet_map for common packets
  • Modern GenServer-based client with exponential backoff reconnection
  • Transport-agnostic server that works with ThousandIsland or Ranch
  • Clean, composable API designed for IoT and real-time applications
  • Zero external dependencies for the core codec

The codec has been tested for interoperability with:

  • Zephyr RTOS MQTT client (Nordic nRF9160, nRF52)
  • Eclipse Paho clients (C, Python, JavaScript)
  • Mosquitto broker
  • Standard MQTT test suites

Installation

Add mqttx to your dependencies:

def deps do
  [
    {:mqttx, "~> 0.1.0"},
    # Optional: Pick a transport
    {:thousand_island, "~> 1.0"},  # or {:ranch, "~> 2.1"}
    # Optional: Payload codecs
    {:protox, "~> 1.7"}
  ]
end

Quick Start

MQTT Server

Create a handler module:

defmodule MyApp.MqttHandler do
  use MqttX.Server

  @impl true
  def init(_opts) do
    %{subscriptions: %{}}
  end

  @impl true
  def handle_connect(client_id, credentials, state) do
    IO.puts("Client connected: #{client_id}")
    {:ok, state}
  end

  @impl true
  def handle_publish(topic, payload, opts, state) do
    IO.puts("Received on #{inspect(topic)}: #{payload}")
    {:ok, state}
  end

  @impl true
  def handle_subscribe(topics, state) do
    qos_list = Enum.map(topics, fn t -> t.qos end)
    {:ok, qos_list, state}
  end

  @impl true
  def handle_disconnect(reason, _state) do
    IO.puts("Client disconnected: #{inspect(reason)}")
    :ok
  end
end

Start the server:

{:ok, _pid} = MqttX.Server.start_link(
  MyApp.MqttHandler,
  [],
  transport: MqttX.Transport.ThousandIsland,
  port: 1883
)

MQTT Client

# Connect
{:ok, client} = MqttX.Client.connect(
  host: "localhost",
  port: 1883,
  client_id: "my_client",
  username: "user",        # optional
  password: "secret"       # optional
)

# Subscribe
:ok = MqttX.Client.subscribe(client, "sensors/#", qos: 1)

# Publish
:ok = MqttX.Client.publish(client, "sensors/temp", "25.5")

# Disconnect
:ok = MqttX.Client.disconnect(client)

Packet Codec (Standalone)

# Encode a packet
packet = %{
  type: :publish,
  topic: "test/topic",
  payload: "hello",
  qos: 0,
  retain: false
}
{:ok, binary} = MqttX.Packet.Codec.encode(4, packet)

# Decode a packet
{:ok, {decoded, rest}} = MqttX.Packet.Codec.decode(4, binary)

Transport Adapters

MqttX supports pluggable transports:

MqttX.Server.start_link(
  MyHandler,
  [],
  transport: MqttX.Transport.ThousandIsland,
  port: 1883
)

Ranch

MqttX.Server.start_link(
  MyHandler,
  [],
  transport: MqttX.Transport.Ranch,
  port: 1883
)

Payload Codecs

Built-in payload codecs for message encoding/decoding:

JSON (Erlang/OTP 27+)

Uses the built-in Erlang JSON module:

{:ok, json} = MqttX.Payload.JSON.encode(%{temp: 25.5})
{:ok, data} = MqttX.Payload.JSON.decode(json)

Protobuf

{:ok, binary} = MqttX.Payload.Protobuf.encode(my_proto_struct)
{:ok, struct} = MqttX.Payload.Protobuf.decode(binary, MyProto.Message)

Raw (Pass-through)

{:ok, binary} = MqttX.Payload.Raw.encode(<<1, 2, 3>>)
{:ok, binary} = MqttX.Payload.Raw.decode(<<1, 2, 3>>)

Topic Routing

The server includes a topic router with wildcard support:

alias MqttX.Server.Router

router = Router.new()
router = Router.subscribe(router, "sensors/+/temp", client_ref, qos: 1)
router = Router.subscribe(router, "alerts/#", client_ref, qos: 0)

# Find matching subscriptions
matches = Router.match(router, "sensors/room1/temp")
# => [{client_ref, %{qos: 1}}]

Protocol Support

  • MQTT 3.1 (protocol version 3)
  • MQTT 3.1.1 (protocol version 4)
  • MQTT 5.0 (protocol version 5)

All 15 packet types are supported:

  • CONNECT, CONNACK
  • PUBLISH, PUBACK, PUBREC, PUBREL, PUBCOMP
  • SUBSCRIBE, SUBACK
  • UNSUBSCRIBE, UNSUBACK
  • PINGREQ, PINGRESP
  • DISCONNECT
  • AUTH (MQTT 5.0)

Performance

The packet codec is optimized for:

  • Zero-copy binary references (sub-binaries)
  • Unrolled remaining length decode for common cases
  • Returns iodata for encoding (avoids concatenation)
  • Inline functions for hot paths

Benchmarks vs mqtt_packet_map (Apple M4 Pro):

OperationMqttXmqtt_packet_mapResult
PUBLISH encode5.05M ips1.72M ips2.9x faster
SUBSCRIBE encode3.42M ips0.82M ips4.2x faster
PUBLISH decode2.36M ips2.25M ips~same

Roadmap

v0.3.0 - Core Functionality

FeatureDescriptionPriority
TLS/SSL Client SupportClient only supports TCP, no :ssl optionHigh
QoS 2 Complete FlowClient doesn't implement PUBREC/PUBREL/PUBCOMP exchangeHigh
Message Inflight TrackingNo retry mechanism for unacknowledged QoS 1/2 messagesHigh
Session Persistenceclean_session=false sessions aren't stored/restoredMedium
Retained MessagesServer doesn't store or deliver retained messagesMedium
Will Message DeliveryServer receives will but doesn't publish on ungraceful disconnectMedium

v0.4.0 - MQTT 5.0 Advanced Features

FeatureDescription
Shared Subscriptions$share/group/topic pattern for load balancing
Topic AliasReduce bandwidth with topic aliases
Message ExpiryRespect message_expiry_interval property
Flow ControlEnforce receive_maximum for backpressure
Enhanced AuthComplete AUTH packet exchange flow
Request/ResponseResponse topic and correlation data handling

v0.5.0 - Production Readiness

FeatureDescription
Telemetry:telemetry events for metrics/observability
WebSocket TransportFor browser-based MQTT clients
ClusteringDistributed router across Erlang nodes
Connection SupervisionDynamicSupervisor for client connections
Rate LimitingConnection and message rate limits

Future Improvements

ItemDescription
Trie-based RouterO(topic_depth) matching instead of O(n) list scan
Property-based TestsStreamData for fuzzing packet codec
Integration TestsTest against Mosquitto, EMQX, HiveMQ

License

Apache-2.0