Tortoise311 (tortoise311 v0.11.1) View Source

A MQTT client for Elixir.

Tortoise311 provides ways of publishing messages to, and receiving messages from one or many MQTT brokers via TCP or SSL. The design philosophy of Tortoise311 is to hide the protocol specific details from the user, and expose interfaces and a connection life cycle that should feel natural to Elixir, while not limiting the capability of what one can do with the MQTT protocol.

First off, connection to a broker happens through a connection specification. This results in a process that can be supervised, either by the application the connection should live and die with, or by being supervised by the Tortoise311 application itself. Once the connection is established the Tortoise311 application should do its best to keep that connection open, by automatically sending keep alive messages (as the protocol specifies), and eventually attempt to reconnect if the connection should drop.

Secondly, a connection is specified with a user defined callback module, following the Tortoise311.Handler-behaviour, which allow the user to hook into certain events happening in the life cycle of the connection. This way code can get executed when:

  • The connection is established
  • The client has been disconnected from the broker
  • A topic filter subscription has been accepted (or declined)
  • A topic filter has been successfully unsubscribed
  • A message is received on one of the subscribed topic filters

Besides this there are hooks for the usual life-cycle events one would expect, such as init/1 and terminate/2.

Thirdly, publishing is handled in such a way that the semantics of the levels of Quality of Service, specified by the MQTT protocol, is mapped to the Elixir message passing semantics. Tortoise311 expose an interface for publishing messages that hide the protocol details of message delivery (retrieval of acknowledge, release, complete messages) and instead provide Tortoise311.publish/4 which will deliver the message to the broker and receive a response in the process mailbox when a message with a QoS>0 has been handed to the server. This allow the user to keep track of the messages that has been delivered, or simply by using the Tortoise311.publish_sync/4 form that will block the calling process until the message has been safely handed to the broker. Messages with QoS1 or QoS2 are stored in a process until they are delivered, so once they are published the client should retry delivery to make sure they reach their destination.

An alternative way of posting messages is implemented in Tortoise311.Pipe, which provide a data structure that among other things keep a reference to the connection socket. This allow for an efficient way of posting messages because the data can get shot directly onto the wire without having to copy the message between processes (unless the message has a QoS of 1 or 2, in which case they will end up in a process to ensure they will get delivered). The pipe will automatically renew its connection socket if the connection has been dropped, so ideally this message sending approach should be fast and efficient.

Link to this section Summary

Types

An identifier used to identify the client on the server.

A 16-bit number identifying a message in a message exchange.

An optional message payload.

What Quality of Service (QoS) mode should be used.

A topic for a message.

A topic filter for a subscription.

Functions

The default timeout value

Publish a message to the MQTT broker.

Synchronously send a message to the MQTT broker.

Link to this section Types

Specs

client_id() :: atom() | String.t()

An identifier used to identify the client on the server.

Most servers accept a maximum of 23 UTF-8 encode bytes for a client id, and only the characters:

  • "0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ"

Tortoise311 accept atoms as client ids but they it will be converted to a string before going on the wire. Be careful with atoms such as Example because they are expanded to the atom :"Elixir.Example", it is really easy to hit the maximum byte limit. Solving this is easy, just add a : before the client id such as :Example.

Specs

package_identifier() :: 1..65535 | nil

A 16-bit number identifying a message in a message exchange.

Some MQTT packages are part of a message exchange and need an identifier so the server and client can distinct between multiple in-flight messages.

Tortoise311 will assign package identifier to packages that need them, so outside of tests (where it is beneficial to assert on the identifier of a package) it should be set by tortoise itself; so just leave it as nil.

Specs

payload() :: binary() | nil

An optional message payload.

A message can optionally have a payload. The payload is a series of bytes and for MQTT 3.1.1 the payload has no defined structure; any series of bytes will do, and the client has to make sense of it.

The payload will be nil if there is no payload. This is done to distinct between a zero byte binary and an empty payload.

Specs

qos() :: 0..2

What Quality of Service (QoS) mode should be used.

Quality of Service is one of 0, 1, and 2 denoting the following:

  • 0 no quality of service. The message is a fire and forget.

  • 1 at least once delivery. The receiver will respond with an acknowledge message, so the sender will be certain that the message has reached the destination. It is possible that a message will be delivered twice though, as the package identifier for a publish will be relinquished when the message has been acknowledged, so a package with the same identifier will be treated as a new message though it might be a re-transmission.

  • 2 exactly once delivery. The receiver will only receive the message once. This happens by having a more elaborate message exchange than the QoS=1 variant.

There are a difference in the semantics of assigning a QoS to a publish and a subscription. When assigned to a publish the message will get delivered to the server with the requested QoS; that is if it accept that level of QoS for the given topic.

When used in the context of a subscription it should be read as the maximum QoS. When messages are published to the subscribed topic the message will get on-warded with the same topic as it was delivered with, or downgraded to the maximum QoS of the subscription for the given subscribing client. That is, if the client subscribe with a maximum QoS=2 and a message is published to said topic with a QoS=1, the message will get downgraded to QoS=1 when on-warded to the client.

Specs

topic() :: String.t()

A topic for a message.

According to the MQTT 3.1.1 specification a valid topic must be at least one character long. They are case sensitive and can include space characters.

MQTT topics consist of topic levels which are delimited with forward slashes /. A topic with a leading or trailing forward slash is allowed but they create distinct topics from the ones without; /sports/tennis/results are different from sports/tennis/results. While a topic level normally require at least one character the topic / (a single forward slash) is valid.

The server will drop the connection if it receive an invalid topic.

Specs

topic_filter() :: String.t()

A topic filter for a subscription.

The topic filter is different from a topic because it is allowed to contain wildcard characters:

  • + is a single level wildcard which is allowed to stand on any position in the topic filter. For instance: sport/+/results will match sport/tennis/results, sport/soccer/results, etc.

  • # is a multi-level wildcard and is only allowed to be on the last position of the topic filter. For instance: sport/# will match sport/tennis/results, sport/tennis/announcements, etc.

The server will reject any invalid topic filter and close the connection.

Link to this section Functions

The default timeout value

Link to this function

publish(client_id, topic, payload \\ nil, opts \\ [])

View Source

Specs

publish(client_id(), topic(), payload, [options]) ::
  :ok | {:ok, reference()} | {:error, :unknown_connection} | {:error, :timeout}
when payload: binary() | nil,
     options:
       {:qos, qos()}
       | {:retain, boolean()}
       | {:identifier, package_identifier()}
       | {:timeout, non_neg_integer()}

Publish a message to the MQTT broker.

The publish function requires a client_id and a valid MQTT topic. If no payload is set an empty zero byte message will get send to the broker.

Optionally an options list can get passed to the publish, making it possible to specify if the message should be retained on the server, and with what quality of service the message should be published with.

  • retain indicates, when set to true, that the broker should retain the message for the topic. Retained messages are delivered to clients when they subscribe to the topic. Only one message at a time can be retained for a given topic, so sending a new one will overwrite the old. retain defaults to false.

  • qos set the quality of service, and integer of 0, 1, or 2. The qos defaults to 0.

Publishing a message with the payload hello to to topic foo/bar with a QoS1 could look like this:

Tortoise311.publish("client_id", "foo/bar", "hello", qos: 1)

Notice that if you want to send a message with an empty payload with options you will have to set to payload to nil like this:

Tortoise311.publish("client_id", "foo/bar", nil, retain: true)

Return Values

The specified Quality of Service for a given publish will alter the behaviour of the return value. When publishing a message with a QoS0 an :ok will simply get returned. This is because a QoS0 is a "fire and forget." There are no quality of service so no efforts are made to ensure that the message will reach its destination (though it very likely will).

:ok = Tortoise311.publish("client_id", "foo/bar", nil, qos: 0)

When a message is published using either a QoS1 or QoS2, Tortoise311 will ensure that the message is delivered. A unique reference will get returned and eventually a message will get delivered to the process mailbox, containing the result of the publish when it has been handed over:

{:ok, ref} = Tortoise311.publish("client_id", "foo/bar", nil, qos: 2)
receive do
  {{Tortoise311, "client_id"}, ^ref, result} ->
    IO.inspect({:result, result})
after
  5000 ->
    {:error, :timeout}
end

Be sure to implement a handle_info/2 in GenServer processes that publish messages using Tortoise311.publish/4. Notice that the returned message has a structure:

{{Tortoise311, "client_id"}, ^ref, result}

It is possible to send to multiple clients and blanket match on results designated for a given client id, and the message is tagged with Tortoise311 so it is easy to see where the message originated from.

Link to this function

publish_sync(client_id, topic, payload \\ nil, opts \\ [])

View Source

Specs

publish_sync(client_id(), topic(), payload, [options]) ::
  :ok | {:error, :unknown_connection} | {:error, :timeout}
when payload: binary() | nil,
     options:
       {:qos, qos()}
       | {:retain, boolean()}
       | {:identifier, package_identifier()}
       | {:timeout, timeout()}

Synchronously send a message to the MQTT broker.

This is very similar to Tortoise311.publish/4 with the difference that it will block the calling process until the message has been handed over to the server; the configuration options are the same with the addition of the timeout option which specifies how long we are willing to wait for a reply. Per default the timeout is set to Tortoise311.default_timeout(), it is advisable to set it to a reasonable amount in milliseconds as it otherwise could block forever.

msg = "Hello, from the World of Tomorrow !"
case Tortoise311.publish_sync("my_client_id", "foo/bar", msg, qos: 2, timeout: 200) do
  :ok ->
    :done

  {:error, :timeout} ->
    :timeout
end

Notice: It does not make sense to use publish_sync/4 on a publish that has a QoS=0, because that will return instantly anyways. It is made possible for consistency, and it is the default QoS.

See the documentation for Tortoise311.publish/4 for configuration.