MqttX.Client provides a GenServer-based MQTT client with automatic reconnection and exponential backoff.

Basic Connection

{:ok, client} = MqttX.Client.connect(
  host: "localhost",
  port: 1883,
  client_id: "my_client",
  username: "user",        # optional
  password: "secret"       # optional
)

:ok = MqttX.Client.subscribe(client, "sensors/#", qos: 1)
:ok = MqttX.Client.publish(client, "sensors/temp", "25.5")
:ok = MqttX.Client.disconnect(client)

Supervised Connections

Use connect_supervised/1 to start connections under MqttX.Client.Supervisor. Supervised connections are automatically restarted on crash and registered in MqttX.ClientRegistry for lookup.

# Start a supervised connection
{:ok, client} = MqttX.Client.connect_supervised(
  host: "localhost",
  port: 1883,
  client_id: "my_client"
)

# List all registered connections
MqttX.Client.list()
#=> [{"my_client", #PID<0.123.0>, %{host: "localhost", port: 1883}}]

# Look up by client_id
{pid, _meta} = MqttX.Client.whereis("my_client")

The supervisor uses a :one_for_one strategy — each connection is independent. If a connection process crashes, only that connection is restarted. The unsupervised connect/1 function remains available for cases where you manage the lifecycle yourself.

TLS/SSL

{:ok, client} = MqttX.Client.connect(
  host: "broker.example.com",
  port: 8883,
  client_id: "secure_client",
  transport: :ssl,
  ssl_opts: [
    verify: :verify_peer,
    cacerts: :public_key.cacerts_get(),
    server_name_indication: ~c"broker.example.com"
  ]
)

When :transport is :ssl, the default port changes to 8883.

WebSocket Transport

Connect to brokers that expose MQTT over WebSocket:

{:ok, client} = MqttX.Client.connect(
  host: "broker.example.com",
  port: 8083,
  client_id: "ws_client",
  transport: :ws,
  ws_path: "/mqtt"
)

For secure WebSocket (WSS):

{:ok, client} = MqttX.Client.connect(
  host: "broker.example.com",
  port: 8084,
  client_id: "wss_client",
  transport: :wss,
  ws_path: "/mqtt",
  ssl_opts: [verify: :verify_peer, cacerts: :public_key.cacerts_get()]
)

Default ports: 8083 for :ws, 8084 for :wss. The :ws_path defaults to "/mqtt".

Session Persistence

For QoS 1/2 reliability across reconnects, disable clean sessions and provide a session store:

{:ok, client} = MqttX.Client.connect(
  host: "localhost",
  client_id: "persistent_client",
  clean_session: false,
  session_store: MqttX.Session.ETSStore
)

The built-in MqttX.Session.ETSStore persists for the lifetime of the BEAM VM. Implement the MqttX.Session.Store behaviour for custom backends (Redis, database, etc.).

Receiving Messages

Pass a :handler module that implements handle_mqtt_event/3 to process incoming messages and lifecycle events:

defmodule MyHandler do
  def handle_mqtt_event(:message, {topic, payload, _packet}, state) do
    IO.puts("Received on #{inspect(topic)}: #{payload}")
    state
  end

  def handle_mqtt_event(:connected, _data, state) do
    IO.puts("Connected!")
    state
  end

  def handle_mqtt_event(:disconnected, reason, state) do
    IO.puts("Disconnected: #{inspect(reason)}")
    state
  end
end

{:ok, client} = MqttX.Client.connect(
  host: "localhost",
  client_id: "my_client",
  handler: MyHandler,
  handler_state: %{}
)

The handler receives three event types:

EventDataDescription
:message{topic, payload, packet}Incoming PUBLISH message
:connected%{properties: props}Connection established (props contains CONNACK properties)
:disconnectedreasonConnection lost (may be {:server_disconnect, code, %{server_reference: ref}})

MQTT 5.0 Features

Request/Response

MqttX.Client.request/4 sets up the MQTT 5.0 request/response pattern by subscribing to the response topic and publishing with response_topic and correlation_data properties. It returns the generated correlation_data for matching responses in your handler:

{:ok, correlation_data} = MqttX.Client.request(client, "service/rpc", "ping",
  response_topic: "reply/my_client"
)

# Match the response in your handler:
def handle_mqtt_event(:message, {_topic, payload, packet}, state) do
  if packet.properties[:correlation_data] == state.pending_correlation do
    # This is the response
  end
  state
end

Enhanced Authentication

For brokers that require multi-step authentication (SASL-style), implement handle_auth/3 in your handler:

defmodule MyAuthHandler do
  def handle_mqtt_event(_event, _data, state), do: state

  def handle_auth(0x18, %{auth_method: "SCRAM-SHA-256", auth_data: challenge}, state) do
    response = compute_scram_response(challenge, state.credentials)
    {:continue, response, state}
  end

  def handle_auth(_reason_code, _props, state) do
    {:ok, state}
  end
end

Include auth_method in connect properties to initiate enhanced auth:

{:ok, client} = MqttX.Client.connect(
  host: "broker.example.com",
  client_id: "my_client",
  protocol_version: 5,
  connect_properties: %{auth_method: "SCRAM-SHA-256", auth_data: initial_data},
  handler: MyAuthHandler,
  handler_state: %{credentials: my_creds}
)

Server-Negotiated Settings

The client automatically applies MQTT 5.0 CONNACK properties from the broker:

PropertyBehavior
server_keep_aliveOverrides the client's keepalive timer
assigned_client_identifierReplaces the client's ID when connecting with empty client_id
maximum_packet_sizeEnforced on outgoing packets; oversized sends return {:error, :packet_too_large}
receive_maximumLimits concurrent in-flight QoS 1/2 publishes
server_referenceLogged on CONNACK rejection or server DISCONNECT (for redirect)

Publishing with Properties

MqttX.Client.publish(client, "events/alert", payload,
  qos: 1,
  properties: %{
    message_expiry_interval: 3600,
    content_type: "application/json"
  }
)

Connect Options

OptionDescriptionDefault
:hostBroker hostnamerequired
:portBroker port1883 / 8883 / 8083 / 8084
:client_idClient identifierrequired
:usernameAuthentication usernamenil
:passwordAuthentication passwordnil
:clean_sessionStart fresh sessiontrue
:keepaliveKeep-alive interval (seconds)60
:transport:tcp, :ssl, :ws, or :wss:tcp
:ssl_optsSSL options for :ssl or :wss transport[]
:ws_pathWebSocket path for :ws or :wss transport"/mqtt"
:retry_intervalQoS retry interval (ms)5000
:max_inflightMax pending QoS 1/2 messages100
:connect_propertiesMQTT 5.0 CONNECT properties map%{}
:session_storeSession store modulenil
:handlerCallback module for messagesnil
:handler_stateInitial handler statenil