Tortoise311.Handler behaviour (tortoise311 v0.11.1) View Source

User defined callback module for handling connection life cycle events.

Tortoise311.Handler defines a behaviour which can be given to a Tortoise311.Connection. This allow the user to implement functionality for events happening in the life cycle of the connection, the provided callbacks are:

  • init/1 and terminate/2 are called when the connection is started and stopped. Parts that are setup in init/1 can be torn down in terminate/2. Notice that the connection process is not terminated if the connection to the broker is lost; Tortoise311 will attempt to reconnect, and events that should happen when the connection goes offline should be set up using the connection/3 callback.

  • last_will/1 is called to get a last will message for a new connection. If none is provided (nil), the connection's default last will will be used.

  • connection/3 is called when the connection is :up or :down, allowing for functionality to be run when the connection state changes.

  • subscription/3 is called when a topic filter subscription changes status, so this callback can be used to control the life-cycle of a subscription, allowing us to implement custom behavior for when the subscription is accepted, declined, as well as unsubscribed.

  • handle_message/3 is run when the client receive a message on one of the subscribed topic filters.

Because the callback-module will run inside the connection controller process, which also handles the routing of protocol messages (such as publish acknowledge messages) it is important that the callbacks do not call functions that will block the process, especially for clients that subscribe to topics with heavy traffic.

Technically it would be possible to run the callback module in a different process than the controller process, but it has been decided to keep it on the controller as we otherwise would have to copy every message evaluated by the connection to another process. It is much better to let the end-user handle the dispatching to other parts of the system while we are evaluating what to do with the process anyways with the caveats:

  • The callbacks should not block the controller

  • it is not possible to call the (un)subscribe and publish functions from within a callback as they will block the controller.

While it is not possible to subscribe and unsubscribe in the handler process using the Tortoise311.subscribe/3 and Tortoise311.unsubscribe/3 it is possible to make changes to the subscription list via :gen_statem inspired next_actions.

Next actions

In some situations one would like to subscribe or unsubscribe a filter topic when a certain event happens on the client. The functions for interacting with the MQTT broker, defined on the Tortoise311-module are for the most part blocking operations, or require the user to peek into the process mailbox to fetch the result of the operation. To allow for changes in the subscriptions one can define a set of next actions that should happen as part of the return value to the handle_message/3, subscription/3, and connection/3 callbacks by returning a {:ok, state, next_actions} where next_actions is a list of commands of:

  • {:subscribe, topic_filter, qos: qos, timeout: 5000} where topic_filter is a binary containing a valid MQTT topic filter, and qos is the desired quality of service (0..2). The timeout is the amount of time in milliseconds we are willing to wait for a response to the request.

  • {:unsubscribe, topic_filter} where topic_filter is a binary containing the name of the subscription we want to unsubscribe from.

If we want to unsubscribe from the current topic when we receive a message on it we could write a handle_message/3 as follows:

def handle_message(topic, _payload, state) do
  topic = Enum.join(topic, "/")
  next_actions = [{:unsubscribe, topic}]
  {:ok, state, next_actions}
end

Note that the topic is received as a list of topic levels, and that the next actions has to be a list, even if there is only one next action; multiple actions can be given at once. Read more about this in the handle_message/3 documentation.

Link to this section Summary

Types

Action to perform before reentering the execution loop.

t()

Data structure describing the user defined callback handler

Callbacks

Invoked when the connection status changes.

Invoked when messages are published to subscribed topics.

Invoked when the connection is started.

Invoked when a new connection is being attempted to set the last will message to one provided by the handler. If the handler returns {{:ok, nil}, state}, the pre-set last will is used.

Invoked when the subscription of a topic filter changes status.

Invoked when the connection process is about to exit.

Link to this section Types

Specs

next_action() ::
  {:subscribe, Tortoise311.topic_filter(),
   [qos: Tortoise311.qos(), timeout: timeout()]}
  | {:unsubscribe, Tortoise311.topic_filter()}

Action to perform before reentering the execution loop.

The supported next actions are:

  • Tell the connection process to subscribe to a topic filter
  • Tell the connection process to unsubscribe from a topic filter

More next actions might be supported in the future.

Specs

t() :: %Tortoise311.Handler{
  initial_args: term(),
  module: module(),
  state: term()
}

Data structure describing the user defined callback handler

The data structure describe the current state as well as its initial arguments and the module driving the handler. This allow Tortoise311 to restart the handler if needed be.

Link to this section Callbacks

Link to this callback

connection(status, state)

View Source

Specs

connection(status, state :: term()) ::
  {:ok, new_state} | {:ok, new_state, [next_action()]}
when status: :up | :down, new_state: term()

Invoked when the connection status changes.

status is one of :up or :down, where up means we have an open connection to the MQTT broker, and down means the connection is temporary down. The connection process will attempt to reestablish the connection.

Returning {:ok, new_state} will set the state for later invocations.

Returning {:ok, new_state, next_actions}, where next_actions is a list of next actions such as {:unsubscribe, "foo/bar"} will result in the state being returned and the next actions performed.

Link to this callback

handle_message(topic_levels, payload, state)

View Source

Specs

handle_message(topic_levels, payload, state :: term()) ::
  {:ok, new_state} | {:ok, new_state, [next_action()]}
when new_state: term(),
     topic_levels: [String.t()],
     payload: Tortoise311.payload()

Invoked when messages are published to subscribed topics.

The topic comes in the form of a list of binaries, making it possible to pattern match on the topic levels of the retrieved message, store the individual topic levels as variables and use it in the function body.

Payload is a binary. MQTT 3.1.1 does not specify any format of the payload, so it has to be decoded and validated depending on the needs of the application.

In an example where we are already subscribed to the topic filter room/+/temp and want to dispatch the received messages to a Temperature application we could set up our handle_message as such:

def handle_message(["room", room, "temp"], payload, state) do
  :ok = Temperature.record(room, payload)
  {:ok, state}
end

Notice; the handle_message/3-callback run inside the connection controller process, so for handlers that are subscribing to topics with heavy traffic should do as little as possible in the callback handler and dispatch to other parts of the application using non-blocking calls.

Returning {:ok, new_state} will reenter the loop and set the state for later invocations.

Returning {:ok, new_state, next_actions}, where next_actions is a list of next actions such as {:unsubscribe, "foo/bar"} will reenter the loop and perform the listed actions.

Specs

init(args :: term()) :: {:ok, state} when state: any()

Invoked when the connection is started.

args is the argument passed in from the connection configuration.

Returning {:ok, state} will let the MQTT connection receive data from the MQTT broker, and the value contained in state will be used as the process state.

Link to this callback

last_will(state)

View Source (optional)

Specs

last_will(state) :: {{:ok, term() | nil}, state} when state: any()

Invoked when a new connection is being attempted to set the last will message to one provided by the handler. If the handler returns {{:ok, nil}, state}, the pre-set last will is used.

Link to this callback

subscription(status, topic_filter, state)

View Source

Specs

subscription(status, topic_filter, state :: term()) ::
  {:ok, new_state} | {:ok, new_state, [next_action()]}
when status:
       :up
       | :down
       | {:warn, [requested: Tortoise311.qos(), accepted: Tortoise311.qos()]}
       | {:error, term()},
     topic_filter: Tortoise311.topic_filter(),
     new_state: term()

Invoked when the subscription of a topic filter changes status.

The status of a subscription can be one of:

  • :up, triggered when the subscription has been accepted by the MQTT broker with the requested quality of service

  • {:warn, [requested: req_qos, accepted: qos]}, triggered when the subscription is accepted by the MQTT broker, but with a different quality of service qos than the one requested req_qos

  • {:error, reason}, triggered when the subscription is rejected with the reason reason such as :access_denied

  • :down, triggered when the subscription of the given topic filter has been successfully acknowledged as unsubscribed by the MQTT broker

The topic_filter is the topic filter in question, and the state is the internal state being passed through transitions.

Returning {:ok, new_state} will set the state for later invocations.

Returning {:ok, new_state, next_actions}, where next_actions is a list of next actions such as {:unsubscribe, "foo/bar"} will result in the state being returned and the next actions performed.

Link to this callback

terminate(reason, state)

View Source

Specs

terminate(reason, state :: term()) :: ignored
when reason: :normal | :shutdown | {:shutdown, term()}, ignored: term()

Invoked when the connection process is about to exit.

If anything is setup during the init/1 callback it should get cleaned up during the terminate/2 callback.