gemqtt/subscriber
Module for subscribing to messages from an MQTT server.
Example usage:
client
|> subscriber.new
|> subscriber.set_qos(gemqtt.AtLestOnce)
|> subscriber.add(["my/topic", "another/topic"])
let assert Ok(got_msg) =
process.new_selector()
|> subscriber.selecting_mqtt_messages(Ok)
|> process.select_forever
Types
Representation of a message received from an MQTT topic.
pub type Message {
Message(
client: Client,
duplicate: Bool,
packet_id: Option(Int),
payload: BitArray,
qos: gemqtt.Qos,
retain: Bool,
topic: String,
)
}
Constructors
-
Message( client: Client, duplicate: Bool, packet_id: Option(Int), payload: BitArray, qos: gemqtt.Qos, retain: Bool, topic: String, )
Packet sent by MQTT server to acknowledge a published message.
pub type PubAck {
PubAck(packet_id: Int, reason_code: Int)
}
Constructors
-
PubAck(packet_id: Int, reason_code: Int)
Represents how retained messages on a topic are handled when a subscription to it is added.
pub type RetainHandling {
SentAlways
SentOnNewSubscription
SentNever
}
Constructors
-
SentAlways
Retained messages are sent whenever a subscription is established.
-
SentOnNewSubscription
Retained messages are sent only when establishing a new subscription, not a repeated one.
-
SentNever
No retained messages are sent when a subscription is established.
Subscriber holds the configuration for subscribing to one or more topics.
Create it with the new
function.
pub opaque type Subscriber
Functions
pub fn add(
sub: Subscriber,
topics topics: List(String),
) -> Result(#(Option(Properties), List(Int)), Nil)
pub fn message_from_dynamic(
input: Dynamic,
) -> Result(Message, List(DecodeError))
Decodes a message received by emqtt. Prefer selecting_mqtt_messages
over using this directly.
pub fn new(client: Client) -> Subscriber
Creates a new subscriber with the default options. Defaults are documented on each set function.
pub fn puback_from_dynamic(
input: Dynamic,
) -> Result(PubAck, List(DecodeError))
Decodes a pub-ack received by emqtt. Prefer selecting_mqtt_pubacks
over using this directly.
pub fn remove(
client: Client,
topics topics: List(String),
) -> Result(#(Option(Properties), List(Int)), Nil)
pub fn selecting_mqtt_messages(
selector: Selector(a),
mapper: fn(Message) -> a,
) -> Selector(a)
Configures a selector to receive messages from MQTT clients.
Please note this will receive messages from all MQTT clients that the process controls, rather than any specific one. If you wish to only handle messages from one client then use one process per client.
pub fn selecting_mqtt_pubacks(
selector: Selector(a),
mapper: fn(PubAck) -> a,
) -> Selector(a)
Configures a selector to receive pub-acks from MQTT clients.
Please note this will receive pub-acks from all MQTT clients that the process controls, rather than any specific one.
pub fn set_local_echo(
subscriber: Subscriber,
value: Bool,
) -> Subscriber
Controls whether a subscriber will see messages published by the same client ID as itself; defaults to true.
Warning: doesn’t seem to work correctly, possibly an emqtt or mosquitto issue.
pub fn set_property(
sub: Subscriber,
name: String,
value: a,
) -> Subscriber
Sets an MQTT SUBSCRIBE packet property. Please note that properties are not validated prior to being sent to the server.
Available properties:
Subscription-Identifier
: 1 to 268,435,455User-Property
: UTF-8 string pair
Example usage:
client
|> subscriber.new
|> subscriber.set_property("Subscription-Identifier", 42)
|> subscriber.set_property("User-Property", #("prop-name", "prop-value"))
pub fn set_qos(subscriber: Subscriber, value: Qos) -> Subscriber
Specifies the highest QoS level for topics on this subscriber.
(low) AtMostOnce
< AtLeastOnce
< ExactlyOnce
(high).
Defaults to AtMostOnce
.
pub fn set_retain_as_published(
subscriber: Subscriber,
value: Bool,
) -> Subscriber
Controls whether the subscriptions will have the “retain as published” option set. When true, the retain flag on an incoming message will be exactly as set by the publishing client, rather than indicating whether the message is fresh/stale.
Defaults to false.
pub fn set_retain_handling(
subscriber: Subscriber,
value: RetainHandling,
) -> Subscriber
Controls how retained messages are handled when a subscription is added.
See the documentaton on the RetainHandling
type for more information.
Defaults to SentAlways
.