MqttX.Client (MqttX v0.7.0)
View SourceMQTT Client API.
Provides a simple interface for connecting to MQTT brokers.
Example
# Connect
{:ok, client} = MqttX.Client.connect(
host: "localhost",
port: 1883,
client_id: "my_app"
)
# Subscribe
:ok = MqttX.Client.subscribe(client, "sensors/#", qos: 1)
# Publish
:ok = MqttX.Client.publish(client, "sensors/temp", "25.5")
# Disconnect
:ok = MqttX.Client.disconnect(client)Receiving Messages
To receive messages, provide a handler module:
defmodule MyHandler do
def handle_mqtt_event(:message, {topic, payload, _opts}, state) do
IO.puts("Received: " <> inspect({topic, payload}))
state
end
def handle_mqtt_event(:connected, _data, state) do
IO.puts("Connected!")
state
end
def handle_mqtt_event(:disconnected, reason, state) do
IO.puts("Disconnected: " <> inspect(reason))
state
end
end
{:ok, client} = MqttX.Client.connect(
host: "localhost",
client_id: "my_app",
handler: MyHandler,
handler_state: %{}
)
Summary
Functions
Connect to an MQTT broker.
Connect to an MQTT broker with supervision.
Check if the client is connected.
Disconnect from the broker.
List all registered client connections.
Publish a message to a topic.
Set up an MQTT 5.0 Request/Response exchange.
Subscribe to one or more topics.
Unsubscribe from one or more topics.
Look up a client connection by its client_id.
Functions
Connect to an MQTT broker.
Options
:host- Broker hostname (required):port- Broker port (default: 1883):client_id- Client identifier (required):username- Optional username:password- Optional password:clean_session- Clean session flag (default: true):keepalive- Keepalive interval in seconds (default: 60):handler- Module to receive callbacks:handler_state- Initial state for handler:name- Optional name for the client process
Returns
{:ok, pid} on success, {:error, reason} on failure.
Connect to an MQTT broker with supervision.
Starts the connection under MqttX.Client.Supervisor, providing
automatic restart on crash. The connection is registered in
MqttX.ClientRegistry for lookup by client_id.
Accepts the same options as connect/1.
Example
{:ok, pid} = MqttX.Client.connect_supervised(
host: "localhost",
port: 1883,
client_id: "my_client"
)
Check if the client is connected.
@spec disconnect(pid()) :: :ok
Disconnect from the broker.
List all registered client connections.
Returns a list of {client_id, pid, metadata} tuples for all connections
registered in MqttX.ClientRegistry.
Example
MqttX.Client.list()
#=> [{"my_client", #PID<0.123.0>, %{host: "localhost", port: 1883}}]
Publish a message to a topic.
Options
:qos- QoS level 0, 1, or 2 (default: 0):retain- Retain flag (default: false)
Set up an MQTT 5.0 Request/Response exchange.
Subscribes to the response_topic, then publishes a message with
response_topic and correlation_data properties set. Returns the
generated correlation_data so the caller can match incoming responses
in their handler.
This is a setup helper, not a blocking RPC call. Use handle_mqtt_event/3
in your handler to match responses by correlation_data.
Options
:response_topic- Topic to receive the response on (required):qos- QoS level for both request and subscription (default: 0)
Returns
{:ok, correlation_data}- Request sent, use this to match the response{:error, reason}- Subscribe or publish failed
Example
{:ok, correlation_data} = MqttX.Client.request(
client,
"api/users/get",
Jason.encode!(%{id: 123}),
response_topic: "api/responses/" <> client_id
)
# In your handler:
def handle_mqtt_event(:message, {_topic, payload, packet}, state) do
if packet.properties[:correlation_data] == state.pending_correlation do
# This is the response
end
state
end
Subscribe to one or more topics.
Options
:qos- QoS level 0, 1, or 2 (default: 0)
Unsubscribe from one or more topics.
Look up a client connection by its client_id.
Returns {pid, metadata} if found, or nil if not registered.
Example
MqttX.Client.whereis("my_client")
#=> {#PID<0.123.0>, %{host: "localhost", port: 1883}}