MqttX.Client (MqttX v0.7.0)

View Source

MQTT Client API.

Provides a simple interface for connecting to MQTT brokers.

Example

# Connect
{:ok, client} = MqttX.Client.connect(
  host: "localhost",
  port: 1883,
  client_id: "my_app"
)

# Subscribe
:ok = MqttX.Client.subscribe(client, "sensors/#", qos: 1)

# Publish
:ok = MqttX.Client.publish(client, "sensors/temp", "25.5")

# Disconnect
:ok = MqttX.Client.disconnect(client)

Receiving Messages

To receive messages, provide a handler module:

defmodule MyHandler do
  def handle_mqtt_event(:message, {topic, payload, _opts}, state) do
    IO.puts("Received: " <> inspect({topic, payload}))
    state
  end

  def handle_mqtt_event(:connected, _data, state) do
    IO.puts("Connected!")
    state
  end

  def handle_mqtt_event(:disconnected, reason, state) do
    IO.puts("Disconnected: " <> inspect(reason))
    state
  end
end

{:ok, client} = MqttX.Client.connect(
  host: "localhost",
  client_id: "my_app",
  handler: MyHandler,
  handler_state: %{}
)

Summary

Functions

Connect to an MQTT broker.

Connect to an MQTT broker with supervision.

Check if the client is connected.

Disconnect from the broker.

List all registered client connections.

Publish a message to a topic.

Set up an MQTT 5.0 Request/Response exchange.

Subscribe to one or more topics.

Unsubscribe from one or more topics.

Look up a client connection by its client_id.

Functions

connect(opts)

@spec connect(keyword()) :: {:ok, pid()} | {:error, term()}

Connect to an MQTT broker.

Options

  • :host - Broker hostname (required)
  • :port - Broker port (default: 1883)
  • :client_id - Client identifier (required)
  • :username - Optional username
  • :password - Optional password
  • :clean_session - Clean session flag (default: true)
  • :keepalive - Keepalive interval in seconds (default: 60)
  • :handler - Module to receive callbacks
  • :handler_state - Initial state for handler
  • :name - Optional name for the client process

Returns

{:ok, pid} on success, {:error, reason} on failure.

connect_supervised(opts)

@spec connect_supervised(keyword()) :: {:ok, pid()} | {:error, term()}

Connect to an MQTT broker with supervision.

Starts the connection under MqttX.Client.Supervisor, providing automatic restart on crash. The connection is registered in MqttX.ClientRegistry for lookup by client_id.

Accepts the same options as connect/1.

Example

{:ok, pid} = MqttX.Client.connect_supervised(
  host: "localhost",
  port: 1883,
  client_id: "my_client"
)

connected?(client)

@spec connected?(pid()) :: boolean()

Check if the client is connected.

disconnect(client)

@spec disconnect(pid()) :: :ok

Disconnect from the broker.

list()

@spec list() :: [{binary(), pid(), map()}]

List all registered client connections.

Returns a list of {client_id, pid, metadata} tuples for all connections registered in MqttX.ClientRegistry.

Example

MqttX.Client.list()
#=> [{"my_client", #PID<0.123.0>, %{host: "localhost", port: 1883}}]

publish(client, topic, payload, opts \\ [])

@spec publish(pid(), binary(), binary(), keyword()) :: :ok | {:error, term()}

Publish a message to a topic.

Options

  • :qos - QoS level 0, 1, or 2 (default: 0)
  • :retain - Retain flag (default: false)

request(client, topic, payload, opts)

@spec request(pid(), binary(), binary(), keyword()) ::
  {:ok, binary()} | {:error, term()}

Set up an MQTT 5.0 Request/Response exchange.

Subscribes to the response_topic, then publishes a message with response_topic and correlation_data properties set. Returns the generated correlation_data so the caller can match incoming responses in their handler.

This is a setup helper, not a blocking RPC call. Use handle_mqtt_event/3 in your handler to match responses by correlation_data.

Options

  • :response_topic - Topic to receive the response on (required)
  • :qos - QoS level for both request and subscription (default: 0)

Returns

  • {:ok, correlation_data} - Request sent, use this to match the response
  • {:error, reason} - Subscribe or publish failed

Example

{:ok, correlation_data} = MqttX.Client.request(
  client,
  "api/users/get",
  Jason.encode!(%{id: 123}),
  response_topic: "api/responses/" <> client_id
)

# In your handler:
def handle_mqtt_event(:message, {_topic, payload, packet}, state) do
  if packet.properties[:correlation_data] == state.pending_correlation do
    # This is the response
  end
  state
end

subscribe(client, topics, opts \\ [])

@spec subscribe(pid(), binary() | [binary()], keyword()) :: :ok | {:error, term()}

Subscribe to one or more topics.

Options

  • :qos - QoS level 0, 1, or 2 (default: 0)

unsubscribe(client, topics)

@spec unsubscribe(pid(), binary() | [binary()]) :: :ok | {:error, term()}

Unsubscribe from one or more topics.

whereis(client_id)

@spec whereis(binary()) :: {pid(), map()} | nil

Look up a client connection by its client_id.

Returns {pid, metadata} if found, or nil if not registered.

Example

MqttX.Client.whereis("my_client")
#=> {#PID<0.123.0>, %{host: "localhost", port: 1883}}