gemqtt/subscriber

Module for subscribing to messages from an MQTT server.

Example usage:

client
|> subscriber.new
|> subscriber.set_qos(gemqtt.AtLestOnce)
|> subscriber.add(["my/topic", "another/topic"])

let assert Ok(got_msg) =
  process.new_selector()
  |> subscriber.selecting_mqtt_messages(Ok)
  |> process.select_forever

Types

Representation of a message received from an MQTT topic.

pub type Message {
  Message(
    client: Client,
    duplicate: Bool,
    packet_id: Option(Int),
    payload: BitArray,
    qos: gemqtt.Qos,
    retain: Bool,
    topic: String,
  )
}

Constructors

  • Message(
      client: Client,
      duplicate: Bool,
      packet_id: Option(Int),
      payload: BitArray,
      qos: gemqtt.Qos,
      retain: Bool,
      topic: String,
    )

Packet sent by MQTT server to acknowledge a published message.

pub type PubAck {
  PubAck(packet_id: Int, reason_code: Int)
}

Constructors

  • PubAck(packet_id: Int, reason_code: Int)

Represents how retained messages on a topic are handled when a subscription to it is added.

pub type RetainHandling {
  SentAlways
  SentOnNewSubscription
  SentNever
}

Constructors

  • SentAlways

    Retained messages are sent whenever a subscription is established.

  • SentOnNewSubscription

    Retained messages are sent only when establishing a new subscription, not a repeated one.

  • SentNever

    No retained messages are sent when a subscription is established.

Subscriber holds the configuration for subscribing to one or more topics. Create it with the new function.

pub opaque type Subscriber

Functions

pub fn add(
  sub: Subscriber,
  topics topics: List(String),
) -> Result(#(Option(Properties), List(Int)), Nil)
pub fn message_from_dynamic(
  input: Dynamic,
) -> Result(Message, List(DecodeError))

Decodes a message received by emqtt. Prefer selecting_mqtt_messages over using this directly.

pub fn new(client: Client) -> Subscriber

Creates a new subscriber with the default options. Defaults are documented on each set function.

pub fn puback_from_dynamic(
  input: Dynamic,
) -> Result(PubAck, List(DecodeError))

Decodes a pub-ack received by emqtt. Prefer selecting_mqtt_pubacks over using this directly.

pub fn remove(
  client: Client,
  topics topics: List(String),
) -> Result(#(Option(Properties), List(Int)), Nil)
pub fn selecting_mqtt_messages(
  selector: Selector(a),
  mapper: fn(Message) -> a,
) -> Selector(a)

Configures a selector to receive messages from MQTT clients.

Please note this will receive messages from all MQTT clients that the process controls, rather than any specific one. If you wish to only handle messages from one client then use one process per client.

pub fn selecting_mqtt_pubacks(
  selector: Selector(a),
  mapper: fn(PubAck) -> a,
) -> Selector(a)

Configures a selector to receive pub-acks from MQTT clients.

Please note this will receive pub-acks from all MQTT clients that the process controls, rather than any specific one.

pub fn set_local_echo(
  subscriber: Subscriber,
  value: Bool,
) -> Subscriber

Controls whether a subscriber will see messages published by the same client ID as itself; defaults to true.

Warning: doesn’t seem to work correctly, possibly an emqtt or mosquitto issue.

pub fn set_property(
  sub: Subscriber,
  name: String,
  value: a,
) -> Subscriber

Sets an MQTT SUBSCRIBE packet property. Please note that properties are not validated prior to being sent to the server.

Available properties:

  • Subscription-Identifier: 1 to 268,435,455
  • User-Property: UTF-8 string pair

Example usage:

client
|> subscriber.new
|> subscriber.set_property("Subscription-Identifier", 42)
|> subscriber.set_property("User-Property", #("prop-name", "prop-value"))
pub fn set_qos(subscriber: Subscriber, value: Qos) -> Subscriber

Specifies the highest QoS level for topics on this subscriber.

(low) AtMostOnce < AtLeastOnce < ExactlyOnce (high).

Defaults to AtMostOnce.

pub fn set_retain_as_published(
  subscriber: Subscriber,
  value: Bool,
) -> Subscriber

Controls whether the subscriptions will have the “retain as published” option set. When true, the retain flag on an incoming message will be exactly as set by the publishing client, rather than indicating whether the message is fresh/stale.

Defaults to false.

pub fn set_retain_handling(
  subscriber: Subscriber,
  value: RetainHandling,
) -> Subscriber

Controls how retained messages are handled when a subscription is added. See the documentaton on the RetainHandling type for more information.

Defaults to SentAlways.

Search Document