MqttX.Client.Connection (MqttX v0.10.0)

Copy Markdown View Source

MQTT client connection GenServer.

Manages a connection to an MQTT broker with automatic reconnection.

Usage

{:ok, pid} = MqttX.Client.Connection.start_link(
  host: "localhost",
  port: 1883,
  client_id: "my_client",
  handler: MyHandler,
  handler_state: %{}
)

:ok = MqttX.Client.Connection.subscribe(pid, "test/#", qos: 1)
:ok = MqttX.Client.Connection.publish(pid, "test/topic", "hello", qos: 0)

TLS/SSL Support

To connect using TLS/SSL:

{:ok, pid} = MqttX.Client.Connection.start_link(
  host: "broker.example.com",
  port: 8883,
  client_id: "my_client",
  transport: :ssl,
  ssl_opts: [verify: :verify_peer, cacerts: :public_key.cacerts_get()]
)

The :transport option defaults to :tcp for backward compatibility.

Summary

Functions

Returns a specification to start this module under a supervisor.

Check if connected.

Disconnect from the broker.

Start a client connection.

Subscribe to topics.

Unsubscribe from topics.

Types

t()

@type t() :: %MqttX.Client.Connection{
  alias_to_topic: term(),
  backoff: term(),
  buffer: term(),
  clean_session: term(),
  client_id: term(),
  connect_properties: term(),
  connected: term(),
  handler: term(),
  handler_has_handle_mqtt_event: term(),
  handler_state: term(),
  host: term(),
  inflight_tx_count: term(),
  keepalive: term(),
  keepalive_timer: term(),
  max_inflight: term(),
  next_alias: term(),
  packet_id: term(),
  password: term(),
  pending_acks: term(),
  pending_subs: term(),
  pingresp_timer: term(),
  port: term(),
  protocol_version: term(),
  receive_maximum: term(),
  reconnect_timer: term(),
  retry_interval: term(),
  retry_timer: term(),
  server_maximum_packet_size: term(),
  server_topic_alias_maximum: term(),
  session_store: term(),
  session_store_state: term(),
  socket: term(),
  ssl_opts: term(),
  subscriptions: term(),
  topic_alias_maximum: term(),
  topic_to_alias: term(),
  transport: term(),
  username: term(),
  will: term(),
  ws_buffer: term(),
  ws_frag: term(),
  ws_path: term()
}

Functions

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

connected?(pid)

@spec connected?(GenServer.server()) :: boolean()

Check if connected.

disconnect(pid, opts \\ [])

@spec disconnect(
  GenServer.server(),
  keyword()
) :: :ok

Disconnect from the broker.

Options (MQTT 5.0)

  • :reason_code - Disconnect reason code (default: 0x00 Normal)
  • :properties - Disconnect properties map, e.g. %{session_expiry_interval: 0}

publish(pid, topic, payload, opts \\ [])

@spec publish(GenServer.server(), binary(), binary(), keyword()) ::
  :ok | {:error, term()}

Publish a message.

Options

  • :qos - QoS level 0, 1, or 2 (default: 0)
  • :retain - Retain flag (default: false)

start_link(opts)

@spec start_link(keyword()) :: GenServer.on_start()

Start a client connection.

Options

  • :host - Broker hostname (required)
  • :port - Broker port (default: 1883 for TCP, 8883 for SSL)
  • :client_id - Client identifier (required)
  • :username - Optional username
  • :password - Optional password
  • :clean_session - Clean session flag (default: true)
  • :keepalive - Keepalive interval in seconds (default: 60)
  • :handler - Module to receive callbacks
  • :handler_state - Initial state for handler
  • :transport - Transport type: :tcp, :ssl, :ws, or :wss (default: :tcp)
  • :ssl_opts - SSL options when transport is :ssl or :wss (e.g., [verify: :verify_peer])
  • :ws_path - WebSocket path when transport is :ws or :wss (default: "/mqtt")
  • :retry_interval - Retry interval for unacknowledged QoS 1/2 messages in ms (default: 5000)
  • :max_inflight - Maximum pending QoS 1/2 messages before backpressure (default: 100)
  • :will_topic - Will message topic (enables Last Will & Testament)
  • :will_payload - Will message payload (default: "")
  • :will_qos - Will message QoS: 0, 1, or 2 (default: 0)
  • :will_retain - Will message retain flag (default: false)
  • :will_properties - Will message properties map (MQTT 5.0, default: %{})
  • :connect_properties - CONNECT packet properties (MQTT 5.0), e.g. %{session_expiry_interval: 3600}
  • :session_store - Session store module or {module, opts} for session persistence

subscribe(pid, topics, opts \\ [])

@spec subscribe(GenServer.server(), binary() | [binary()], keyword()) ::
  {:ok, [integer()]} | {:error, term()}

Subscribe to topics.

Options

  • :qos - QoS level 0, 1, or 2 (default: 0)
  • :no_local - Don't receive own publishes (MQTT 5.0, default: false)
  • :retain_as_published - Keep original retain flag (MQTT 5.0, default: false)
  • :retain_handling - Retained message behavior: 0=send on subscribe, 1=send if new, 2=don't send (MQTT 5.0, default: 0)
  • :properties - SUBSCRIBE packet properties map (MQTT 5.0), e.g. %{subscription_identifier: 1}

unsubscribe(pid, topics)

@spec unsubscribe(GenServer.server(), binary() | [binary()]) :: :ok | {:error, term()}

Unsubscribe from topics.