spoke/mqtt_actor

Types

pub opaque type Builder
pub opaque type Client

If a session needs to be persisted across actor restarts (either in supervision, or across node restarts), the session needs to be stored in some persistent storage.

pub opaque type PersistentStorage

Provides the abstraction over a transport channel, e.g. TCP or WebSocket.

pub type TransportChannel {
  TransportChannel(
    events: process.Selector(core.TransportEvent),
    send: fn(bytes_tree.BytesTree) -> Result(Nil, String),
    close: fn() -> Result(Nil, String),
  )
}

Constructors

pub type TransportChannelConnector =
  fn() -> Result(TransportChannel, String)

A handle for update subscriptions

pub opaque type UpdateSubscription

Values

pub fn build(
  options: mqtt.ConnectOptions(
    fn() -> Result(TransportChannel, String),
  ),
) -> Builder

Starts building a new MQTT client

pub fn connect(
  client: Client,
  clean_session: Bool,
  will: option.Option(mqtt.PublishData),
) -> Nil

Starts connecting to the MQTT server. The connection state will be published as an update. If a connection is already established or being established, this will be a no-op. Note that switching between clean_session values while already connecting is currently not well handled.

pub fn delete_in_memory_storage(
  storage: PersistentStorage,
) -> Nil

Frees the in-memory part of the persistent storage. Possible stored files need to be deleted separately.

pub fn disconnect(client: Client) -> Nil

Disconnects from the MQTT server. The connection state change will also be published as an update. If a connection is not established or being established, this will be a no-op. Returns the serialized session state to be potentially restored later.

pub fn load_ets_session_from_file(
  filename: String,
) -> Result(PersistentStorage, String)

Attempts to load a session that was previously stored in ETS and saved to a file.

pub fn named(
  builder: Builder,
  name: process.Name(core.Input),
) -> #(Builder, Client)

Use a named process and subject with the actor. This allows using supervision, without invalidating previous instances of Client. Returns a named instance of the client and the builder which can be used to restart the actor multiple times. Note that the named instance is NOT started, and you must use start with the returned builder!

pub fn pending_publishes(client: Client) -> Int

Returns the number of QoS > 0 publishes that haven’t yet been completely published. Also see wait_for_publishes_to_finish.

pub fn persist_to_ets(filename: String) -> PersistentStorage

Creates a new empty session that will be persisted to ETS in memory, and can also be stored to a file.

pub fn publish(client: Client, data: mqtt.PublishData) -> Nil

Publishes a new message, which will be sent to the sever. If not connected, Qos 0 messages will be dropped, and higher QoS level messages will be sent once connected.

pub fn start(
  builder: Builder,
  timeout: Int,
) -> Result(actor.Started(Client), actor.StartError)

Starts the actor with the given timeout (including the optional extra init). Will not automatically connect to the server (see connect).

pub fn store_to_file(
  storage: PersistentStorage,
) -> Result(Nil, String)

Stores the persisted session to a file. Note that if the session is active and connected, the result might not be valid. However, this function is provided without checks in order to enable storing to a file even if the actor is dead.

pub fn subscribe(
  client: Client,
  requests: List(mqtt.SubscribeRequest),
) -> Result(List(mqtt.Subscription), mqtt.OperationError)

Subscribes to the given topics. Will block until we get a response from the server, returning the result of the operation.

pub fn subscribe_to_updates(
  client: Client,
  updates: process.Subject(mqtt.Update),
) -> UpdateSubscription

Will start publishing client updates to the given subject.

pub fn supervised(
  builder: Builder,
  timeout: Int,
) -> supervision.ChildSpecification(Client)

Builds a worker child specification from a builder.

pub fn unsubscribe(
  client: Client,
  topics: List(String),
) -> Result(Nil, mqtt.OperationError)

Unsubscribes from the given topics. Will block until we get a response from the server, returning the result of the operation.

pub fn unsubscribe_from_updates(
  client: Client,
  subscription: UpdateSubscription,
) -> Nil

Stops publishing client updates associated to the subscription.

pub fn using_storage(
  builder: Builder,
  storage: PersistentStorage,
) -> Builder

Configures the client to use persistent storage. When starting the session, the state will be loaded from the storage. Note that if clean session is specified, the storage will be also cleared.

pub fn wait_for_publishes_to_finish(
  client: Client,
  timeout: Int,
) -> Result(Nil, mqtt.OperationError)

Wait for all pending QoS > 0 publishes to complete. Returns an error if the operation times out, or panics if the client is killed while waiting.

pub fn with_extra_init(
  builder: Builder,
  init: fn(Client) -> Nil,
) -> Builder

Run extra initialization after creating the actor. Note that this runs in the actor process and contributes to the start timeout. Sending a message to another process can be used to run the initialization asynchronously.

Search Document