MqttX.Server behaviour (MqttX v0.7.0)

View Source

MQTT Server behaviour.

Implement this behaviour to create a custom MQTT server/broker.

Example

defmodule MyApp.MqttHandler do
  use MqttX.Server

  @impl true
  def init(_opts) do
    %{subscriptions: %{}}
  end

  @impl true
  def handle_connect(client_id, credentials, state) do
    IO.puts("Client connected: " <> client_id)
    {:ok, state}
  end

  @impl true
  def handle_publish(topic, payload, opts, state) do
    IO.puts("Received: " <> inspect({topic, payload}))
    {:ok, state}
  end

  @impl true
  def handle_subscribe(topics, state) do
    qos_list = Enum.map(topics, fn t -> t.qos end)
    {:ok, qos_list, state}
  end

  @impl true
  def handle_disconnect(reason, state) do
    IO.puts("Client disconnected: " <> inspect(reason))
    :ok
  end
end

# Start the server
MqttX.Server.start_link(MyApp.MqttHandler, [],
  transport: MqttX.Transport.ThousandIsland,
  port: 1883
)

Callbacks

The following callbacks are required:

  • init/1 - Initialize handler state
  • handle_connect/3 - Handle client connection
  • handle_publish/4 - Handle incoming PUBLISH messages
  • handle_subscribe/2 - Handle SUBSCRIBE requests
  • handle_disconnect/2 - Handle client disconnection

Optional callbacks:

  • handle_unsubscribe/2 - Handle UNSUBSCRIBE requests
  • handle_puback/2 - Handle PUBACK for QoS 1 messages

Summary

Callbacks

Handle enhanced authentication (MQTT 5.0).

Handle a client connection.

Handle client disconnection.

Handle custom messages (e.g., from PubSub for outgoing MQTT publishes).

Handle a PUBACK for QoS 1 messages.

Handle an incoming PUBLISH message.

Handle session expiry (MQTT 5.0).

Handle a SUBSCRIBE request.

Handle an UNSUBSCRIBE request.

Initialize the handler state.

Functions

Use MqttX.Server to define default implementations.

Disconnect a client from the server with an MQTT 5.0 reason code.

Types

client_id()

@type client_id() :: binary()

credentials()

@type credentials() :: %{username: binary() | nil, password: binary() | nil}

payload()

@type payload() :: binary()

publish_opts()

@type publish_opts() :: %{
  qos: 0 | 1 | 2,
  retain: boolean(),
  dup: boolean(),
  packet_id: non_neg_integer() | nil,
  properties: map()
}

reason_code()

@type reason_code() :: non_neg_integer()

state()

@type state() :: term()

subscribe_topic()

@type subscribe_topic() :: %{
  topic: topic(),
  qos: 0 | 1 | 2,
  no_local: boolean(),
  retain_as_published: boolean(),
  retain_handling: 0 | 1 | 2
}

topic()

@type topic() :: MqttX.Topic.normalized_topic()

Callbacks

handle_auth(method, data, state)

(optional)
@callback handle_auth(method :: binary(), data :: binary() | nil, state()) ::
  {:ok, state()}
  | {:continue, binary(), state()}
  | {:error, reason_code(), state()}

Handle enhanced authentication (MQTT 5.0).

Called when the server receives an AUTH packet for SASL-style authentication.

Return values:

  • {:ok, state} - Authentication complete, send CONNACK success
  • {:continue, data, state} - Send AUTH continue with data, wait for response
  • {:error, reason_code, state} - Authentication failed

Example

def handle_auth("SCRAM-SHA-256", data, state) do
  case verify_scram(data, state) do
    {:ok, _} -> {:ok, state}
    {:continue, challenge} -> {:continue, challenge, state}
    :error -> {:error, 0x87, state}  # Not authorized
  end
end

handle_connect(client_id, credentials, state)

@callback handle_connect(client_id(), credentials(), state()) ::
  {:ok, state()} | {:error, reason_code(), state()}

Handle a client connection.

Called when a client sends a CONNECT packet.

Return {:ok, new_state} to accept the connection, or {:error, reason_code, new_state} to reject.

handle_disconnect(reason, state)

@callback handle_disconnect(reason :: term(), state()) :: :ok

Handle client disconnection.

Called when the client disconnects or the connection is closed.

handle_info(message, state)

(optional)
@callback handle_info(message :: term(), state()) ::
  {:ok, state()}
  | {:publish, binary(), binary(), state()}
  | {:publish, binary(), binary(), map(), state()}
  | {:disconnect, reason_code(), state()}
  | {:disconnect, reason_code(), map(), state()}
  | {:stop, term(), state()}

Handle custom messages (e.g., from PubSub for outgoing MQTT publishes).

Return values:

  • {:ok, state} - Continue with updated state
  • {:publish, topic, payload, state} - Send PUBLISH to client, then continue
  • {:publish, topic, payload, opts, state} - Send PUBLISH with QoS/retain options
  • {:disconnect, reason_code, state} - Send DISCONNECT and close connection
  • {:disconnect, reason_code, properties, state} - Send DISCONNECT with properties and close
  • {:stop, reason, state} - Close the connection

handle_puback(packet_id, state)

(optional)
@callback handle_puback(packet_id :: non_neg_integer(), state()) :: {:ok, state()}

Handle a PUBACK for QoS 1 messages.

handle_publish(topic, payload, publish_opts, state)

@callback handle_publish(topic(), payload(), publish_opts(), state()) ::
  {:ok, state()}
  | {:error, term(), state()}
  | {:disconnect, reason_code(), state()}
  | {:disconnect, reason_code(), map(), state()}

Handle an incoming PUBLISH message.

Called when a client publishes a message.

handle_session_expired(client_id, state)

(optional)
@callback handle_session_expired(client_id(), state()) :: :ok

Handle session expiry (MQTT 5.0).

Called when a client's session expires after session_expiry_interval seconds post-disconnect. Use this to clean up session state (subscriptions, queued messages, etc.).

handle_subscribe(list, state)

@callback handle_subscribe([subscribe_topic()], state()) ::
  {:ok, [0 | 1 | 2], state()}
  | {:disconnect, reason_code(), state()}
  | {:disconnect, reason_code(), map(), state()}

Handle a SUBSCRIBE request.

Returns the list of granted QoS values for each topic.

handle_unsubscribe(list, state)

(optional)
@callback handle_unsubscribe([topic()], state()) ::
  {:ok, state()}
  | {:disconnect, reason_code(), state()}
  | {:disconnect, reason_code(), map(), state()}

Handle an UNSUBSCRIBE request.

init(opts)

@callback init(opts :: term()) :: state()

Initialize the handler state.

Called when starting the server.

Functions

__using__(opts)

(macro)

Use MqttX.Server to define default implementations.

disconnect(pid, reason_code, properties \\ %{})

@spec disconnect(pid(), reason_code(), map()) :: :ok

Disconnect a client from the server with an MQTT 5.0 reason code.

Sends a DISCONNECT packet to the client and closes the connection. The pid is the transport process handling the client connection.

Example

MqttX.Server.disconnect(client_pid, 0x98)  # Use assigned client identifier
MqttX.Server.disconnect(client_pid, 0x89, %{reason_string: "Session taken over"})

start_link(handler, handler_opts, opts \\ [])

@spec start_link(module(), term(), keyword()) :: {:ok, pid()} | {:error, term()}

Start an MQTT server.

Options

  • :transport - Transport module (default: MqttX.Transport.ThousandIsland)
  • :port - Port to listen on (default: 1883)
  • :name - Optional name for the server process

All other options are passed to the transport adapter.