gemqtt

gemqtt provides an MQTT client for Gleam projects running on the BEAM. It does so by wrapping the emqtt library written in Erlang.

The gemqtt package contains functions for configuring and controlling the client process. See the publisher and subscriber packages for working with MQTT messages.

Example client setup:

let assert Ok(client) =
  gemqtt.new("test.mosquitto.org")
  |> gemqtt.set_auth("user", "password")
  |> gemqtt.start_link

let assert Ok(_) = gemqtt.connect(client)

Types

Client holds the process maintaining the MQTT connection, and is required to publish and subsribe to messages.

pub opaque type Client

Errors that can occur when working with MQTT connections.

pub type Error {
  AlreadyStarted(process.Pid)
  BadProperty(Option(Atom))
  Noproc
  Closed
  Timeout
  Eaddrinuse
  Eaddrnotavail
  Eafnosupport
  Ealready
  Econnaborted
  Econnrefused
  Econnreset
  Edestaddrreq
  Ehostdown
  Ehostunreach
  Einprogress
  Eisconn
  Emsgsize
  Enetdown
  Enetunreach
  Enopkg
  Enoprotoopt
  Enotconn
  Enotty
  Enotsock
  Eproto
  Eprotonosupport
  Eprototype
  Esocktnosupport
  Etimedout
  Ewouldblock
  Exbadport
  Exbadseq
  Nxdomain
  AccessDenied
  BadCertificate
  BadCertificateHashValue
  BadCertificateStatusResponse
  BadRecordMac
  CertificateExpired
  CertificateRevoked
  CertificateUnknown
  CertificateUnobtainable
  CloseNotify
  DecodeError
  DecryptError
  ExportRestriction
  HandshakeFailure
  IllegalParameter
  InappropriateFallback
  InsufficientSecurity
  InternalError
  NoApplicationProtocol
  NoRenegotiation
  ProtocolVersion
  RecordOverflow
  UnexpectedMessage
  UnknownCa
  UnknownPskIdentity
  UnrecognizedName
  UnsupportedCertificate
  UnsupportedExtension
  UserCanceled
}

Constructors

  • AlreadyStarted(process.Pid)
  • BadProperty(Option(Atom))
  • Noproc
  • Closed
  • Timeout
  • Eaddrinuse
  • Eaddrnotavail
  • Eafnosupport
  • Ealready
  • Econnaborted
  • Econnrefused
  • Econnreset
  • Edestaddrreq
  • Ehostdown
  • Ehostunreach
  • Einprogress
  • Eisconn
  • Emsgsize
  • Enetdown
  • Enetunreach
  • Enopkg
  • Enoprotoopt
  • Enotconn
  • Enotty
  • Enotsock
  • Eproto
  • Eprotonosupport
  • Eprototype
  • Esocktnosupport
  • Etimedout
  • Ewouldblock
  • Exbadport
  • Exbadseq
  • Nxdomain
  • AccessDenied
  • BadCertificate
  • BadCertificateHashValue
  • BadCertificateStatusResponse
  • BadRecordMac
  • CertificateExpired
  • CertificateRevoked
  • CertificateUnknown
  • CertificateUnobtainable
  • CloseNotify
  • DecodeError
  • DecryptError
  • ExportRestriction
  • HandshakeFailure
  • IllegalParameter
  • InappropriateFallback
  • InsufficientSecurity
  • InternalError
  • NoApplicationProtocol
  • NoRenegotiation
  • ProtocolVersion
  • RecordOverflow
  • UnexpectedMessage
  • UnknownCa
  • UnknownPskIdentity
  • UnrecognizedName
  • UnsupportedCertificate
  • UnsupportedExtension
  • UserCanceled

MQTT connection option holder. Use new to create it, followed by the set functions in this module to configure it, finally passing it to start_link to create a Client.

pub opaque type Options

emqtt properties holder. Properties can control advanced MQTT protocol features; see the documentation on set_property in each of the gemqtt, publisher, and subscriber modules.

There is also information in the emqtt properties docs for more information.

pub type Properties {
  Properties(Dict(Atom, Dynamic))
}

Constructors

  • Properties(Dict(Atom, Dynamic))

MQTT Quality of Service level, controlling how many times a message may be delivered.

pub type Qos {
  AtMostOnce
  AtLeastOnce
  ExactlyOnce
}

Constructors

  • AtMostOnce
  • AtLeastOnce
  • ExactlyOnce

Functions

pub fn connect(client: Client) -> Result(Nil, Error)

Connects to the configured MQTT server.

pub fn disconnect(client: Client) -> Result(Nil, Error)

Sends a DISCONNECT packet to the server, and closes the connection. The Client process will then exit.

pub fn new(host: String) -> Options

Creates a new Options holder for the specified MQTT server. Do not include a port number in the host string, instead use set_port.

pub fn pid_of(client: Client) -> Pid

Returns the Erlang Pid of the provided Client.

pub fn set_auth(
  opts: Options,
  username: String,
  password: String,
) -> Options

Specifies authentication credentials to use with the MQTT server.

pub fn set_auto_ack(opts: Options, ack: Bool) -> Options

If true (the default), the client process will automatically send an acknowledgement (ie PUBACK) when it receives a packet from the server.

pub fn set_clean_start(opts: Options, clean: Bool) -> Options

Controls whether the server should discard any existing session and start a new session, can prevent receiving old retained messages. Defaults to true.

pub fn set_client_id(opts: Options, id: String) -> Options

Specifies the client identifier. If not given, the client identifier be assigned by the server in MQTT v5 or be automatically generated by emqtt in MQTT v3.1/v3.1.1.

pub fn set_connect_timeout(
  opts: Options,
  seconds timeout: Int,
) -> Options

Sets the maximum time to wait to connect to the server and the server returns a CONNACK. Defaults to 60s.

pub fn set_name(opts: Options, name: String) -> Options

Sets the Erlang server name for the client process.

pub fn set_owner(opts: Options, pid: Pid) -> Options

Specifies the owner, to which the client process will send messages like {disconnected, ReasonCode, Properties}.

pub fn set_port(opts: Options, port: Int) -> Options

Specifies port of the MQTT server to be connected. If not given, the default of 1883 for MQTT or 8883 for MQTT over TLS will be used.

pub fn set_property(
  opts: Options,
  name: String,
  value: a,
) -> Options

Sets an MQTT CONNECT packet property. Please note that properties are validated when start_link is called; if emqtt detects errors it will cause your process to fail rather than return an Error.

Available connect properties:

  • Session-Expiry-Interval: Four byte integer
  • Receive-Maximum: Two byte integer
  • Maximum-Packet-Size: Four byte integer
  • Topic-Alias-Maximum: Two byte integer
  • Request-Response-Information: Byte
  • Request-Problem-Information: Byte
  • User-Property: UTF-8 string pair
  • Authentication-Method: UTF-8 encoded string
  • Authentication-Data: Binary data

Example usage:

gemqtt.new("localhost")
|> gemqtt.set_property("Maximum-Packet-Size", 300)
|> gemqtt.set_property("User-Property", #("prop-name", "prop-value"))
pub fn set_tls_enabled(opts: Options, tls: Bool) -> Options

Enables SSL/TLS transport encryption. Defaults to false.

pub fn set_tls_opt(
  opts: Options,
  name: String,
  value: Dynamic,
) -> Options

Primitive function for setting TLS related options. Needs improvement.

Example usage:

fn main() {
  gemqtt.new("mqtt.example.com")
  |> gemqtt.set_tls_enabled(True)
  |> gemqtt.set_tls_opt(
    "verify",
    dynamic.from(atom.create_from_string("verify_peer")),
  )
  |> gemqtt.set_tls_opt(
    "server_name_indication",
    dynamic.from(atom.create_from_string("disable")),
  )
  |> gemqtt.set_tls_opt("cacerts", cacerts_get())
}

// Returns the OS cacerts.
@external(erlang, "public_key", "cacerts_get")
fn cacerts_get() -> Dynamic
pub fn start_link(opts: Options) -> Result(Client, Error)

Configures a client process and link it to ours. Does not attempt to connect to the MQTT server.

pub fn stop(client: Client) -> Result(Nil, Error)

Stops the client process, prefer disconnect over this.

pub fn subscriptions(
  client: Client,
) -> List(#(String, Dict(Atom, Dynamic)))

Lists this clients active MQTT subscriptions.

Search Document