MqttX.Server behaviour (MqttX v0.7.0)
View SourceMQTT Server behaviour.
Implement this behaviour to create a custom MQTT server/broker.
Example
defmodule MyApp.MqttHandler do
use MqttX.Server
@impl true
def init(_opts) do
%{subscriptions: %{}}
end
@impl true
def handle_connect(client_id, credentials, state) do
IO.puts("Client connected: " <> client_id)
{:ok, state}
end
@impl true
def handle_publish(topic, payload, opts, state) do
IO.puts("Received: " <> inspect({topic, payload}))
{:ok, state}
end
@impl true
def handle_subscribe(topics, state) do
qos_list = Enum.map(topics, fn t -> t.qos end)
{:ok, qos_list, state}
end
@impl true
def handle_disconnect(reason, state) do
IO.puts("Client disconnected: " <> inspect(reason))
:ok
end
end
# Start the server
MqttX.Server.start_link(MyApp.MqttHandler, [],
transport: MqttX.Transport.ThousandIsland,
port: 1883
)Callbacks
The following callbacks are required:
init/1- Initialize handler statehandle_connect/3- Handle client connectionhandle_publish/4- Handle incoming PUBLISH messageshandle_subscribe/2- Handle SUBSCRIBE requestshandle_disconnect/2- Handle client disconnection
Optional callbacks:
handle_unsubscribe/2- Handle UNSUBSCRIBE requestshandle_puback/2- Handle PUBACK for QoS 1 messages
Summary
Callbacks
Handle enhanced authentication (MQTT 5.0).
Handle a client connection.
Handle client disconnection.
Handle custom messages (e.g., from PubSub for outgoing MQTT publishes).
Handle a PUBACK for QoS 1 messages.
Handle an incoming PUBLISH message.
Handle session expiry (MQTT 5.0).
Handle a SUBSCRIBE request.
Handle an UNSUBSCRIBE request.
Initialize the handler state.
Functions
Use MqttX.Server to define default implementations.
Disconnect a client from the server with an MQTT 5.0 reason code.
Start an MQTT server.
Types
@type client_id() :: binary()
@type payload() :: binary()
@type publish_opts() :: %{ qos: 0 | 1 | 2, retain: boolean(), dup: boolean(), packet_id: non_neg_integer() | nil, properties: map() }
@type reason_code() :: non_neg_integer()
@type state() :: term()
@type topic() :: MqttX.Topic.normalized_topic()
Callbacks
@callback handle_auth(method :: binary(), data :: binary() | nil, state()) :: {:ok, state()} | {:continue, binary(), state()} | {:error, reason_code(), state()}
Handle enhanced authentication (MQTT 5.0).
Called when the server receives an AUTH packet for SASL-style authentication.
Return values:
{:ok, state}- Authentication complete, send CONNACK success{:continue, data, state}- Send AUTH continue with data, wait for response{:error, reason_code, state}- Authentication failed
Example
def handle_auth("SCRAM-SHA-256", data, state) do
case verify_scram(data, state) do
{:ok, _} -> {:ok, state}
{:continue, challenge} -> {:continue, challenge, state}
:error -> {:error, 0x87, state} # Not authorized
end
end
@callback handle_connect(client_id(), credentials(), state()) :: {:ok, state()} | {:error, reason_code(), state()}
Handle a client connection.
Called when a client sends a CONNECT packet.
Return {:ok, new_state} to accept the connection,
or {:error, reason_code, new_state} to reject.
Handle client disconnection.
Called when the client disconnects or the connection is closed.
@callback handle_info(message :: term(), state()) :: {:ok, state()} | {:publish, binary(), binary(), state()} | {:publish, binary(), binary(), map(), state()} | {:disconnect, reason_code(), state()} | {:disconnect, reason_code(), map(), state()} | {:stop, term(), state()}
Handle custom messages (e.g., from PubSub for outgoing MQTT publishes).
Return values:
{:ok, state}- Continue with updated state{:publish, topic, payload, state}- Send PUBLISH to client, then continue{:publish, topic, payload, opts, state}- Send PUBLISH with QoS/retain options{:disconnect, reason_code, state}- Send DISCONNECT and close connection{:disconnect, reason_code, properties, state}- Send DISCONNECT with properties and close{:stop, reason, state}- Close the connection
@callback handle_puback(packet_id :: non_neg_integer(), state()) :: {:ok, state()}
Handle a PUBACK for QoS 1 messages.
@callback handle_publish(topic(), payload(), publish_opts(), state()) :: {:ok, state()} | {:error, term(), state()} | {:disconnect, reason_code(), state()} | {:disconnect, reason_code(), map(), state()}
Handle an incoming PUBLISH message.
Called when a client publishes a message.
Handle session expiry (MQTT 5.0).
Called when a client's session expires after session_expiry_interval seconds
post-disconnect. Use this to clean up session state (subscriptions, queued messages, etc.).
@callback handle_subscribe([subscribe_topic()], state()) :: {:ok, [0 | 1 | 2], state()} | {:disconnect, reason_code(), state()} | {:disconnect, reason_code(), map(), state()}
Handle a SUBSCRIBE request.
Returns the list of granted QoS values for each topic.
@callback handle_unsubscribe([topic()], state()) :: {:ok, state()} | {:disconnect, reason_code(), state()} | {:disconnect, reason_code(), map(), state()}
Handle an UNSUBSCRIBE request.
Initialize the handler state.
Called when starting the server.
Functions
Use MqttX.Server to define default implementations.
@spec disconnect(pid(), reason_code(), map()) :: :ok
Disconnect a client from the server with an MQTT 5.0 reason code.
Sends a DISCONNECT packet to the client and closes the connection.
The pid is the transport process handling the client connection.
Example
MqttX.Server.disconnect(client_pid, 0x98) # Use assigned client identifier
MqttX.Server.disconnect(client_pid, 0x89, %{reason_string: "Session taken over"})
Start an MQTT server.
Options
:transport- Transport module (default:MqttX.Transport.ThousandIsland):port- Port to listen on (default: 1883):name- Optional name for the server process
All other options are passed to the transport adapter.