Behaviour for MQTT session persistence.
Implement this behaviour to provide custom session storage for MQTT clients
and servers. Sessions store subscriptions, pending messages, and packet IDs
for clients with clean_session: false.
Built-in Implementations
MqttX.Session.ETSStore- In-memory ETS-based storage (default)
Session Data Structure
Sessions are stored as maps with the following structure:
%{
subscriptions: [%{topic: "sensor/#", qos: 1}],
pending_messages: [%{packet_id: 1, topic: "...", payload: "..."}],
packet_id: 42
}Usage
Configure session storage when starting a client or server:
# Using the built-in ETS store
{:ok, client} = MqttX.Client.connect(
host: "localhost",
client_id: "persistent_client",
clean_session: false,
session_store: MqttX.Session.ETSStore
)
# Using a custom store with options
{:ok, client} = MqttX.Client.connect(
host: "localhost",
client_id: "persistent_client",
clean_session: false,
session_store: {MyApp.EctoSessionStore, repo: MyApp.Repo}
)Implementing a Custom Store
defmodule MyApp.EctoSessionStore do
@behaviour MqttX.Session.Store
@impl true
def init(opts) do
repo = Keyword.fetch!(opts, :repo)
{:ok, %{repo: repo}}
end
@impl true
def save(client_id, session, %{repo: repo}) do
# Save to database
:ok
end
@impl true
def load(client_id, %{repo: repo}) do
# Load from database
{:ok, session} | :not_found
end
@impl true
def delete(client_id, %{repo: repo}) do
# Delete from database
:ok
end
end
Summary
Types
@type client_id() :: binary()
@type session() :: %{ subscriptions: [%{topic: binary(), qos: 0 | 1 | 2}], pending_messages: [map()], packet_id: non_neg_integer() }
@type state() :: term()
Callbacks
Delete a session.
Called when a client connects with clean_session: true to remove
any previously saved session.
Parameters
client_id- The MQTT client identifierstate- Store state from init/1
Returns
:ok- Session deleted (or didn't exist){:error, reason}- Failed to delete session
Initialize the session store.
Called once when the client or server starts. Returns initial store state that will be passed to subsequent callbacks.
Parameters
opts- Options passed when configuring the session store
Returns
{:ok, state}- Store initialized successfully{:error, reason}- Failed to initialize
Load a session.
Called when a client connects with clean_session: false to restore
a previously saved session.
Parameters
client_id- The MQTT client identifierstate- Store state from init/1
Returns
{:ok, session}- Session found and loaded:not_found- No session exists for this client{:error, reason}- Failed to load session
Save a session.
Called when a client disconnects with clean_session: false to persist
the session state for later restoration.
Parameters
client_id- The MQTT client identifiersession- Session data to persiststate- Store state from init/1
Returns
:ok- Session saved successfully{:error, reason}- Failed to save session