gen_mqtt v0.4.0 GenMQTT behaviour

A behaviour module for implementing MQTT client processes.

Example

This example assumes an MQTT server running on localhost on port 1883.

defmodule TemperatureLogger do
  use GenMQTT

  def start_link do
    GenMQTT.start_link(__MODULE__, nil)
  end

  def on_connect(state) do
    :ok = GenMQTT.subscribe(self, "room/+/temp", 0)
    {:ok, state}
  end

  def on_publish(["room", location, "temp"], message, state) do
    IO.puts "It is #{message} degrees in #{location}"
    {:ok, state}
  end
end

This will log to the console every time a sensor posts a temperature to the broker.

Callbacks

GenMQTT defines 12 callbacks, all of them are automatically defined when you use GenMQTT in your module, letting you define the callbacks you want to customize. Six of the callbacks are similar to the ones you know from GenServer, and the GenServer documentation should be consulted for info on these. They are: init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, and code_change/3.

The remaining six are specific to GenMQTT and deal with various events in a MQTT life cycle:

  • on_connect/1 is run when the client connects or reconnects with the broker.

  • on_connect_error/2 is triggered if the connection fails for whatever reason.

  • on_disconnect/1 is run when the client disconnects from the MQTT broker.

  • on_subscribe/2 run when the client subscribes to a topic.

  • on_unsubscribe/2 run when the client stops subscribing to a topic.

  • on_publish/3 triggered everytime something is published to the broker.

All callbacks are optional. A macro will define a default function for undefined callbacks, so you only need to implement on_publish/3 if that is what you need.

Name Registration

A GenMQTT is bound to the same name registration rules as GenServers. Read more about it in the Elixir GenServer docs.

Link to this section Summary

Types

Debug options supported by the start* functions

The GenMQTT process name

Return values of start* functions

Option values used by the start* functions

Functions

Make a call to the underlying state machine

Make a cast to the underlying state machine

Disconnect from the MQTT broker and stop the process

Publish payload to topic with quality of service set to qos

Starts a GenMQTT process without links (outside of a supervision tree)

Start a linked connection to a MQTT broker

Subscribe to one or multiple topics given a list of tuples containing the topic name and its quality of service [{"topic", 0}, ..]

Subscribe to topic with quality of service set to qos

Unsubscribe from one or more topic

Callbacks

Invoked when the server is started. start_link/3 and start/3 will block until it returns. state is the second term passed into either of the two start functions

Triggered when the client successfully establish a connection to the broker. It will get run again if the client should disconnect from the broker, i.e. it temporarily becomes unavailable for whatever reason, if some numeral value has been set to the start option reconnect_timeout

Callback triggered if there was a problem connecting to the broker. The reason is given as the first argument as an atom, making it possible to pattern match and react. The second argument is the process state

Callback triggered when the client disconnects from the broker for whatever reason

Callback triggered when a message has been published to a topic the client subscribes to

Callback triggered when the client successfully subscribes to one or more topics

Callback triggered when the client successfully unsubscribes from one or more subscriptions. It will receive the unsubscribed subscriptions as a list of binaries as the first argument, and the process state as the second

Link to this section Types

Link to this type debug()
debug() :: [:trace | :log | :statistics | {:log_to_file, Path.t}]

Debug options supported by the start* functions

Link to this type from()
from() :: {pid, tag :: term}
Link to this type info_action()
info_action ::
  :connack_in |
  :connect_out |
  :puback_in |
  :puback_out |
  :pubcomp_in |
  :pubcomp_out |
  :publish_in |
  :publish_out |
  :pubrec_in |
  :pubrec_out |
  :pubrel_in |
  :pubrel_out |
  :reconnect |
  :suback |
  :subscribe_out |
  :unsuback |
  :unsubscribe_out
Link to this type info_fun()
info_fun() :: {({info_action, message_id :: charlist}, state :: term -> new_state :: term), initial_state :: term}
Link to this type name()
name() :: atom | {:global, term} | {:via, module, term}

The GenMQTT process name

Link to this type on_start()
on_start ::
  {:ok, pid} |
  :ignore |
  {:error, {:already_started, pid} | term}

Return values of start* functions

Link to this type option()
option ::
  {:debug, debug} |
  {:name, name} |
  {:timeout, timeout} |
  {:spawn_opt, Process.spawn_opt} |
  {:host, :inet.ip_address | binary} |
  {:port, :inet.port_number} |
  {:username, username :: binary | :undefined} |
  {:password, password :: binary | :undefined} |
  {:client, client_id :: binary} |
  {:clean_session, boolean} |
  {:last_will_topic, topic :: charlist | binary | :undefined} |
  {:last_will_msg, payload :: charlist | binary | :undefined} |
  {:last_will_qos, qos} |
  {:reconnect_timeout, pos_integer | :undefined} |
  {:keepalive_interval, pos_integer} |
  {:retry_interval, pos_integer} |
  {:proto_version, version :: pos_integer} |
  {:info_fun, info_fun} |
  {:transport, {:gen_tcp, config :: list} | {:ssl, config :: list}}

Option values used by the start* functions

Link to this type options()
options() :: [option]
Link to this type qos()
qos() :: 0 | 1 | 2
Link to this type retain()
retain() :: boolean
Link to this type topic()
topic() :: [binary] | binary

Link to this section Functions

Link to this function call(pid, request)
call(pid, request :: term) :: term

Make a call to the underlying state machine

Link to this function cast(pid, request)
cast(pid, request :: term) :: :ok

Make a cast to the underlying state machine

Link to this function disconnect(pid)
disconnect(pid) :: :ok

Disconnect from the MQTT broker and stop the process.

on_disconnect/1 will not be triggered, if something needs to be cleaned up it can be done in the terminate/2 callback, and the shutdown reason will be :normal

Link to this function publish(pid, topic, payload, qos, retain \\ false)
publish(pid, topic, payload :: binary, qos, retain) :: :ok

Publish payload to topic with quality of service set to qos

If retain is set to true the published message will be retained on the topic, and delivered to new subscribers joining the topic. Only one message per topic can be retained at a time; sending a new retained message will overwrite the old one, regardless of the publisher. retain defaults to false.

Link to this function start(module, args, options \\ [])
start(module, any, options) :: on_start

Starts a GenMQTT process without links (outside of a supervision tree).

See start_link/3 for more information.

Link to this function start_link(module, args, options \\ [])
start_link(module, any, options) :: on_start

Start a linked connection to a MQTT broker

Options

  • :name the name given to the process.

  • :host the host name or ip address of the MQTT broker.

  • :port the port number the MQTT broker is running on given as an integer. This will default to 1883.

  • :username the name of the user on the MQTT broker, defaults to undefined and is not needed if it connect to a broker that allows anonymous connections.

  • :password the password for the user on the MQTT broker. This can be omitted if the broker accepts anonymous connections.

  • :client the client ID. A randomly generated client ID will be used if this is option is not supplied. Notice that all connected clients should have a unique client id. Should you choose to generate your own client ID it should be no longer than 23 characters, unless the broker supports longer client ids. The requirements for a client id is described in the MQTT specifications:

    • http://docs.oasis-open.org/mqtt/mqtt/v3.1.1/os/mqtt-v3.1.1-os.html#_Toc385349242
  • :clean_session boolean value, defaults to true.

  • :last_will_topic topic to send message to if the MQTT client disappears from the broker.

  • :last_will_msg the message that will get sent to last_will_topic if the client disappears from the broker.

  • :last_will_qos the quality of service the last will message should get sent with. This should be specified as an integer value between 0 and 2. It defaults to 0.

  • :reconnect_timeout the number of seconds the client will wait for a connection when attempting to reconnect to a broker.

  • :keepalive_interval the number of seconds between keep alives.

  • :retry_interval the number of seconds between reconnection attempts if the client disconnects from the broker.

  • :proto_version which MQTT protocol version to use, defaults to version 3.

  • :transport the network transport the client should use to communicate with the broker and its respective options. The default transport is {:gen_tcp, []}. For basic SSL support use {:ssl, ssl_options}, which can be configured according to the Erlang documentation on the :ssl module: http://erlang.org/documentation/doc-1/man/ssl.html

  • info_fun a function that can be passed in for logging, benchmarks, debugging, etc. It should not be used in production. Please refer to the unit tests of this project for a simple example of usage.

Link to this function subscribe(pid, topics)
subscribe(pid, [{topic :: binary, qos}]) :: :ok

Subscribe to one or multiple topics given a list of tuples containing the topic name and its quality of service [{"topic", 0}, ..]

Link to this function subscribe(pid, topic, qos)
subscribe(pid, topic, qos) :: :ok

Subscribe to topic with quality of service set to qos

Link to this function unsubscribe(pid, topic)
unsubscribe(pid, topic) :: :ok

Unsubscribe from one or more topic

Link to this section Callbacks

Link to this callback code_change(old_vsn, state, extra)
code_change(old_vsn, state :: term, extra :: term) ::
  {:ok, new_state :: term} |
  {:error, reason :: term} when old_vsn: term | {:down, term}
Link to this callback handle_call(request, from, state)
handle_call(request :: term, from, state) ::
  {:reply, reply, new_state} |
  {:reply, reply, new_state, timeout | :hibernate} |
  {:noreply, new_state} |
  {:noreply, new_state, timeout | :hibernate} |
  {:stop, reason, reply, new_state} |
  {:stop, reason, new_state} when reply: term, state: term, new_state: term, reason: term
Link to this callback handle_cast(request, state)
handle_cast(request :: term, state) ::
  {:noreply, new_state} |
  {:noreply, new_state, timeout | :hibernate} |
  {:stop, reason :: term, new_state} when state: term, new_state: term
Link to this callback handle_info(msg, state)
handle_info(msg :: :timeout | term, state) ::
  {:noreply, new_state} |
  {:noreply, new_state, timeout | :hibernate} |
  {:stop, reason :: term, new_state} when state: term, new_state: term
Link to this callback init(state)
init(state) ::
  {:ok, state} |
  {:ok, state, timeout | :hibernate} |
  :ignore |
  {:stop, reason :: any} when state: any

Invoked when the server is started. start_link/3 and start/3 will block until it returns. state is the second term passed into either of the two start functions.

When this function returns {:ok, state} it will enter its loop and will start receiving messages from the broker, or send messages to it as soon as it has entered the connected state.

Returning {:stop, reason} will cause the start function to return {:error, reason}, and the process will exit with reason without entering its loop or calling terminate/2.

Link to this callback on_connect(state)
on_connect(state) :: {:ok, state} when state: term

Triggered when the client successfully establish a connection to the broker. It will get run again if the client should disconnect from the broker, i.e. it temporarily becomes unavailable for whatever reason, if some numeral value has been set to the start option reconnect_timeout.

Examples

Subscribe to a topic as soon as a connection has been made to the broker:

def on_connect(state) do
  :ok = GenMQTT.subscribe(self, "room/living-room/temp", 0)
  {:ok, state}
end
Link to this callback on_connect_error(reason, state)
on_connect_error(reason, state) :: {:ok, state} when state: term, reason: :server_not_found | :server_not_available | :wrong_protocol_version | :invalid_id | :invalid_credentials | :not_authorized

Callback triggered if there was a problem connecting to the broker. The reason is given as the first argument as an atom, making it possible to pattern match and react. The second argument is the process state.

Link to this callback on_disconnect(state)
on_disconnect(state) :: {:ok, state} when state: term

Callback triggered when the client disconnects from the broker for whatever reason.

Link to this callback on_publish(topic, payload, state)
on_publish(topic, payload :: binary, state) :: {:ok, state} when state: term

Callback triggered when a message has been published to a topic the client subscribes to.

Examples

The following will print the messages sent to the topic room/+/temp.

def on_publish(["room", room, "temp"], temperature, state) do
  IO.puts "It is #{temperature} degrees in #{room}"
  {:ok, state}
end
Link to this callback on_subscribe(list, state)
on_subscribe([{topic, qos}], state) :: {:ok, state} when state: term

Callback triggered when the client successfully subscribes to one or more topics.

The subscriptions are given in tuples containing the topic name and its quality of service.

Link to this callback on_unsubscribe(topic, state)
on_unsubscribe(topic, state) :: {:ok, state} when state: term

Callback triggered when the client successfully unsubscribes from one or more subscriptions. It will receive the unsubscribed subscriptions as a list of binaries as the first argument, and the process state as the second.

Link to this callback terminate(reason, state)
terminate(reason, state) :: term when state: term, reason: :normal | :shutdown | {:shutdown, term} | term