Client Guide
View SourceMqttX.Client provides a GenServer-based MQTT client with automatic reconnection and exponential backoff.
Basic Connection
{:ok, client} = MqttX.Client.connect(
host: "localhost",
port: 1883,
client_id: "my_client",
username: "user", # optional
password: "secret" # optional
)
:ok = MqttX.Client.subscribe(client, "sensors/#", qos: 1)
:ok = MqttX.Client.publish(client, "sensors/temp", "25.5")
:ok = MqttX.Client.disconnect(client)Supervised Connections
Use connect_supervised/1 to start connections under MqttX.Client.Supervisor. Supervised connections are automatically restarted on crash and registered in MqttX.ClientRegistry for lookup.
# Start a supervised connection
{:ok, client} = MqttX.Client.connect_supervised(
host: "localhost",
port: 1883,
client_id: "my_client"
)
# List all registered connections
MqttX.Client.list()
#=> [{"my_client", #PID<0.123.0>, %{host: "localhost", port: 1883}}]
# Look up by client_id
{pid, _meta} = MqttX.Client.whereis("my_client")The supervisor uses a :one_for_one strategy — each connection is independent. If a connection process crashes, only that connection is restarted. The unsupervised connect/1 function remains available for cases where you manage the lifecycle yourself.
TLS/SSL
{:ok, client} = MqttX.Client.connect(
host: "broker.example.com",
port: 8883,
client_id: "secure_client",
transport: :ssl,
ssl_opts: [
verify: :verify_peer,
cacerts: :public_key.cacerts_get(),
server_name_indication: ~c"broker.example.com"
]
)When :transport is :ssl, the default port changes to 8883.
Session Persistence
For QoS 1/2 reliability across reconnects, disable clean sessions and provide a session store:
{:ok, client} = MqttX.Client.connect(
host: "localhost",
client_id: "persistent_client",
clean_session: false,
session_store: MqttX.Session.ETSStore
)The built-in MqttX.Session.ETSStore persists for the lifetime of the BEAM VM. Implement the MqttX.Session.Store behaviour for custom backends (Redis, database, etc.).
Receiving Messages
Pass a :handler module that implements handle_mqtt_event/3 to process incoming messages and lifecycle events:
defmodule MyHandler do
def handle_mqtt_event(:message, {topic, payload, _packet}, state) do
IO.puts("Received on #{inspect(topic)}: #{payload}")
state
end
def handle_mqtt_event(:connected, _data, state) do
IO.puts("Connected!")
state
end
def handle_mqtt_event(:disconnected, reason, state) do
IO.puts("Disconnected: #{inspect(reason)}")
state
end
end
{:ok, client} = MqttX.Client.connect(
host: "localhost",
client_id: "my_client",
handler: MyHandler,
handler_state: %{}
)The handler receives three event types:
| Event | Data | Description |
|---|---|---|
:message | {topic, payload, packet} | Incoming PUBLISH message |
:connected | nil | Connection established |
:disconnected | reason | Connection lost |
MQTT 5.0 Features
Request/Response
MqttX.Client.request/4 sets up the MQTT 5.0 request/response pattern by subscribing to the response topic and publishing with response_topic and correlation_data properties. It returns the generated correlation_data for matching responses in your handler:
{:ok, correlation_data} = MqttX.Client.request(client, "service/rpc", "ping",
response_topic: "reply/my_client"
)
# Match the response in your handler:
def handle_mqtt_event(:message, {_topic, payload, packet}, state) do
if packet.properties[:correlation_data] == state.pending_correlation do
# This is the response
end
state
endPublishing with Properties
MqttX.Client.publish(client, "events/alert", payload,
qos: 1,
properties: %{
message_expiry_interval: 3600,
content_type: "application/json"
}
)Connect Options
| Option | Description | Default |
|---|---|---|
:host | Broker hostname | required |
:port | Broker port | 1883 / 8883 (SSL) |
:client_id | Client identifier | required |
:username | Authentication username | nil |
:password | Authentication password | nil |
:clean_session | Start fresh session | true |
:keepalive | Keep-alive interval (seconds) | 60 |
:transport | :tcp or :ssl | :tcp |
:ssl_opts | SSL options for :ssl transport | [] |
:retry_interval | QoS retry interval (ms) | 5000 |
:session_store | Session store module | nil |
:handler | Callback module for messages | nil |
:handler_state | Initial handler state | nil |