MQTT client connection GenServer.
Manages a connection to an MQTT broker with automatic reconnection.
Usage
{:ok, pid} = MqttX.Client.Connection.start_link(
host: "localhost",
port: 1883,
client_id: "my_client",
handler: MyHandler,
handler_state: %{}
)
:ok = MqttX.Client.Connection.subscribe(pid, "test/#", qos: 1)
:ok = MqttX.Client.Connection.publish(pid, "test/topic", "hello", qos: 0)TLS/SSL Support
To connect using TLS/SSL:
{:ok, pid} = MqttX.Client.Connection.start_link(
host: "broker.example.com",
port: 8883,
client_id: "my_client",
transport: :ssl,
ssl_opts: [verify: :verify_peer, cacerts: :public_key.cacerts_get()]
)The :transport option defaults to :tcp for backward compatibility.
Summary
Functions
Returns a specification to start this module under a supervisor.
Check if connected.
Disconnect from the broker.
Publish a message.
Start a client connection.
Subscribe to topics.
Unsubscribe from topics.
Types
@type t() :: %MqttX.Client.Connection{ alias_to_topic: term(), backoff: term(), buffer: term(), clean_session: term(), client_id: term(), connect_properties: term(), connected: term(), handler: term(), handler_has_handle_mqtt_event: term(), handler_state: term(), host: term(), inflight_tx_count: term(), keepalive: term(), keepalive_timer: term(), max_inflight: term(), next_alias: term(), packet_id: term(), password: term(), pending_acks: term(), pending_subs: term(), pingresp_timer: term(), port: term(), protocol_version: term(), receive_maximum: term(), reconnect_timer: term(), retry_interval: term(), retry_timer: term(), server_maximum_packet_size: term(), server_topic_alias_maximum: term(), session_store: term(), session_store_state: term(), socket: term(), ssl_opts: term(), subscriptions: term(), topic_alias_maximum: term(), topic_to_alias: term(), transport: term(), username: term(), will: term(), ws_buffer: term(), ws_frag: term(), ws_path: term() }
Functions
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec connected?(GenServer.server()) :: boolean()
Check if connected.
@spec disconnect( GenServer.server(), keyword() ) :: :ok
Disconnect from the broker.
Options (MQTT 5.0)
:reason_code- Disconnect reason code (default: 0x00 Normal):properties- Disconnect properties map, e.g.%{session_expiry_interval: 0}
@spec publish(GenServer.server(), binary(), binary(), keyword()) :: :ok | {:error, term()}
Publish a message.
Options
:qos- QoS level 0, 1, or 2 (default: 0):retain- Retain flag (default: false)
@spec start_link(keyword()) :: GenServer.on_start()
Start a client connection.
Options
:host- Broker hostname (required):port- Broker port (default: 1883 for TCP, 8883 for SSL):client_id- Client identifier (required):username- Optional username:password- Optional password:clean_session- Clean session flag (default: true):keepalive- Keepalive interval in seconds (default: 60):handler- Module to receive callbacks:handler_state- Initial state for handler:transport- Transport type::tcp,:ssl,:ws, or:wss(default::tcp):ssl_opts- SSL options when transport is:sslor:wss(e.g.,[verify: :verify_peer]):ws_path- WebSocket path when transport is:wsor:wss(default: "/mqtt"):retry_interval- Retry interval for unacknowledged QoS 1/2 messages in ms (default: 5000):max_inflight- Maximum pending QoS 1/2 messages before backpressure (default: 100):will_topic- Will message topic (enables Last Will & Testament):will_payload- Will message payload (default:""):will_qos- Will message QoS: 0, 1, or 2 (default: 0):will_retain- Will message retain flag (default: false):will_properties- Will message properties map (MQTT 5.0, default:%{}):connect_properties- CONNECT packet properties (MQTT 5.0), e.g.%{session_expiry_interval: 3600}:session_store- Session store module or{module, opts}for session persistence
@spec subscribe(GenServer.server(), binary() | [binary()], keyword()) :: {:ok, [integer()]} | {:error, term()}
Subscribe to topics.
Options
:qos- QoS level 0, 1, or 2 (default: 0):no_local- Don't receive own publishes (MQTT 5.0, default: false):retain_as_published- Keep original retain flag (MQTT 5.0, default: false):retain_handling- Retained message behavior: 0=send on subscribe, 1=send if new, 2=don't send (MQTT 5.0, default: 0):properties- SUBSCRIBE packet properties map (MQTT 5.0), e.g.%{subscription_identifier: 1}
@spec unsubscribe(GenServer.server(), binary() | [binary()]) :: :ok | {:error, term()}
Unsubscribe from topics.