spoke/mqtt_actor
Types
If a session needs to be persisted across actor restarts (either in supervision, or across node restarts), the session needs to be stored in some persistent storage.
pub opaque type PersistentStorage
Provides the abstraction over a transport channel, e.g. TCP or WebSocket.
pub type TransportChannel {
TransportChannel(
events: process.Selector(core.TransportEvent),
send: fn(bytes_tree.BytesTree) -> Result(Nil, String),
close: fn() -> Result(Nil, String),
)
}
Constructors
-
TransportChannel( events: process.Selector(core.TransportEvent), send: fn(bytes_tree.BytesTree) -> Result(Nil, String), close: fn() -> Result(Nil, String), )
pub type TransportChannelConnector =
fn() -> Result(TransportChannel, String)
A handle for update subscriptions
pub opaque type UpdateSubscription
Values
pub fn build(
options: mqtt.ConnectOptions(
fn() -> Result(TransportChannel, String),
),
) -> Builder
Starts building a new MQTT client
pub fn connect(
client: Client,
clean_session: Bool,
will: option.Option(mqtt.PublishData),
) -> Nil
Starts connecting to the MQTT server.
The connection state will be published as an update.
If a connection is already established or being established,
this will be a no-op.
Note that switching between clean_session
values
while already connecting is currently not well handled.
pub fn delete_in_memory_storage(
storage: PersistentStorage,
) -> Nil
Frees the in-memory part of the persistent storage. Possible stored files need to be deleted separately.
pub fn disconnect(client: Client) -> Nil
Disconnects from the MQTT server. The connection state change will also be published as an update. If a connection is not established or being established, this will be a no-op. Returns the serialized session state to be potentially restored later.
pub fn load_ets_session_from_file(
filename: String,
) -> Result(PersistentStorage, String)
Attempts to load a session that was previously stored in ETS and saved to a file.
pub fn named(
builder: Builder,
name: process.Name(core.Input),
) -> #(Builder, Client)
Use a named process and subject with the actor.
This allows using supervision, without invalidating previous instances of
Client
.
Returns a named instance of the client and the builder which can be used to
restart the actor multiple times.
Note that the named instance is NOT started, and you must use start
with the returned builder!
pub fn pending_publishes(client: Client) -> Int
Returns the number of QoS > 0 publishes that haven’t yet been completely published.
Also see wait_for_publishes_to_finish
.
pub fn persist_to_ets(filename: String) -> PersistentStorage
Creates a new empty session that will be persisted to ETS in memory, and can also be stored to a file.
pub fn publish(client: Client, data: mqtt.PublishData) -> Nil
Publishes a new message, which will be sent to the sever. If not connected, Qos 0 messages will be dropped, and higher QoS level messages will be sent once connected.
pub fn start(
builder: Builder,
timeout: Int,
) -> Result(actor.Started(Client), actor.StartError)
Starts the actor with the given timeout (including the optional extra init).
Will not automatically connect to the server (see connect
).
pub fn store_to_file(
storage: PersistentStorage,
) -> Result(Nil, String)
Stores the persisted session to a file. Note that if the session is active and connected, the result might not be valid. However, this function is provided without checks in order to enable storing to a file even if the actor is dead.
pub fn subscribe(
client: Client,
requests: List(mqtt.SubscribeRequest),
) -> Result(List(mqtt.Subscription), mqtt.OperationError)
Subscribes to the given topics. Will block until we get a response from the server, returning the result of the operation.
pub fn subscribe_to_updates(
client: Client,
updates: process.Subject(mqtt.Update),
) -> UpdateSubscription
Will start publishing client updates to the given subject.
pub fn subscribe_to_updates_selecting(
client: Client,
updates: process.Subject(a),
selecting: fn(mqtt.Update) -> option.Option(a),
) -> UpdateSubscription
Will start publishing client updates to the given subject,
after first applying the given selector function to the update.
If the function returns None
, the update will not be published.
pub fn supervised(
builder: Builder,
timeout: Int,
) -> supervision.ChildSpecification(Client)
Builds a worker child specification from a builder.
pub fn unsubscribe(
client: Client,
topics: List(String),
) -> Result(Nil, mqtt.OperationError)
Unsubscribes from the given topics. Will block until we get a response from the server, returning the result of the operation.
pub fn unsubscribe_from_updates(
client: Client,
subscription: UpdateSubscription,
) -> Nil
Stops publishing client updates associated to the subscription.
pub fn using_storage(
builder: Builder,
storage: PersistentStorage,
) -> Builder
Configures the client to use persistent storage. When starting the session, the state will be loaded from the storage. Note that if clean session is specified, the storage will be also cleared.
pub fn wait_for_publishes_to_finish(
client: Client,
timeout: Int,
) -> Result(Nil, mqtt.OperationError)
Wait for all pending QoS > 0 publishes to complete. Returns an error if the operation times out, or panics if the client is killed while waiting.
pub fn with_extra_init(
builder: Builder,
init: fn(Client) -> Nil,
) -> Builder
Run extra initialization after creating the actor. Note that this runs in the actor process and contributes to the start timeout. Sending a message to another process can be used to run the initialization asynchronously.