Tortoise311.Connection (tortoise311 v0.12.1)

View Source

Establish a connection to a MQTT broker.

Todo.

Summary

Functions

Returns a specification to start this module under a supervisor.

Close the connection to the broker.

Ping the broker.

Ping the server and await the ping latency reply.

Start a connection process and link it to the current process.

Subscribe to one or more topics using topic filters on client_id

Subscribe to topics and block until the server acknowledges.

Return the list of subscribed topics.

Unsubscribe from one of more topic filters. The topic filters are given as strings. Multiple topic filters can be given at once by passing in a list of strings.

Unsubscribe from topics and block until the server acknowledges.

Functions

child_spec(init_arg)

@spec child_spec(Keyword.t()) :: %{
  id: term(),
  start: {Tortoise311.Connection, :start_link, [Keyword.t()]},
  restart: :transient | :permanent | :temporary,
  type: :worker
}

Returns a specification to start this module under a supervisor.

See Supervisor.

disconnect(client_id)

@spec disconnect(Tortoise311.client_id()) :: :ok

Close the connection to the broker.

Given the client_id of a running connection it will cancel the inflight messages and send the proper disconnect message to the broker. The session will get terminated on the server.

ping(client_id)

@spec ping(Tortoise311.client_id()) :: {:ok, reference()}

Ping the broker.

When the round-trip is complete a message with the time taken in milliseconds will be send to the process that invoked the ping command.

The connection will automatically ping the broker at the interval specified in the connection configuration, so there is no need to setup a reoccurring ping. This ping function is exposed for debugging purposes. If ping latency over time is desired it is better to listen on :ping_response using the Tortoise311.Events PubSub.

ping_sync(client_id, timeout \\ Tortoise311.default_timeout())

@spec ping_sync(Tortoise311.client_id(), timeout()) ::
  {:ok, reference()} | {:error, :timeout}

Ping the server and await the ping latency reply.

Takes a client_id and an optional timeout.

Like ping/1 but will block the caller process until a response is received from the server. The response will contain the ping latency in milliseconds.

start_link(connection_opts, opts \\ [])

@spec start_link(options, GenServer.options()) :: GenServer.on_start()
when option:
       {:client_id, Tortoise311.client_id()}
       | {:server, {atom(), term()}}
       | {:user_name, String.t()}
       | {:password, String.t()}
       | {:keep_alive, non_neg_integer()}
       | {:keep_alive_timeout, non_neg_integer()}
       | {:will, Tortoise311.Package.Publish.t()}
       | {:subscriptions,
          [{Tortoise311.topic_filter(), Tortoise311.qos()}]
          | Tortoise311.Package.Subscribe.t()}
       | {:clean_session, boolean()}
       | {:enable_telemetry, boolean()}
       | {:handler, {atom(), term()}}
       | {:first_connect_delay, non_neg_integer()},
     options: [option]

Start a connection process and link it to the current process.

Read the documentation on child_spec/1 if you want... (todo!)

subscribe(client_id, topics, opts \\ [])

@spec subscribe(Tortoise311.client_id(), topic | topics, [options]) ::
  {:ok, reference()}
when topics: [topic],
     topic: {Tortoise311.topic_filter(), Tortoise311.qos()},
     options:
       {:timeout, timeout()} | {:identifier, Tortoise311.package_identifier()}

Subscribe to one or more topics using topic filters on client_id

The topic filter should be a 2-tuple, {topic_filter, qos}, where the topic_filter is a valid MQTT topic filter, and qos an integer value 0 through 2.

Multiple topics can be given as a list.

The subscribe function is asynchronous, so it will return {:ok, ref}. Eventually a response will get delivered to the process mailbox, tagged with the reference stored in ref. It will take the form of:

{{Tortoise311, ^client_id}, ^ref, ^result}

Where the result can be one of :ok, or {:error, reason}.

Read the documentation for Tortoise311.Connection.subscribe_sync/3 for a blocking version of this call.

subscribe_sync(client_id, topics, opts \\ [])

@spec subscribe_sync(Tortoise311.client_id(), topic | topics, [options]) ::
  :ok | {:error, :timeout}
when topics: [topic],
     topic: {Tortoise311.topic_filter(), Tortoise311.qos()},
     options:
       {:timeout, timeout()} | {:identifier, Tortoise311.package_identifier()}

Subscribe to topics and block until the server acknowledges.

This is a synchronous version of the Tortoise311.Connection.subscribe/3. In fact it calls into Tortoise311.Connection.subscribe/3 but will handle the selective receive loop, making it much easier to work with. Also, this function can be used to block a process that cannot continue before it has a subscription to the given topics.

See Tortoise311.Connection.subscribe/3 for configuration options.

subscriptions(client_id)

@spec subscriptions(Tortoise311.client_id()) :: Tortoise311.Package.Subscribe.t()

Return the list of subscribed topics.

Given the client_id of a running connection return its current subscriptions. This is helpful in a debugging situation.

unsubscribe(client_id, topics, opts \\ [])

@spec unsubscribe(Tortoise311.client_id(), topic | topics, [options]) ::
  {:ok, reference()}
when topics: [topic],
     topic: Tortoise311.topic_filter(),
     options:
       {:timeout, timeout()} | {:identifier, Tortoise311.package_identifier()}

Unsubscribe from one of more topic filters. The topic filters are given as strings. Multiple topic filters can be given at once by passing in a list of strings.

Tortoise311.Connection.unsubscribe(client_id, ["foo/bar", "quux"])

This operation is asynchronous. When the operation is done a message will be received in mailbox of the originating process.

unsubscribe_sync(client_id, topics, opts \\ [])

@spec unsubscribe_sync(Tortoise311.client_id(), topic | topics, [options]) ::
  :ok | {:error, :timeout}
when topics: [topic],
     topic: Tortoise311.topic_filter(),
     options:
       {:timeout, timeout()} | {:identifier, Tortoise311.package_identifier()}

Unsubscribe from topics and block until the server acknowledges.

This is a synchronous version of Tortoise311.Connection.unsubscribe/3. It will block until the server has send the acknowledge message.

See Tortoise311.Connection.unsubscribe/3 for configuration options.