MqttX.Server.RateLimiter (MqttX v0.7.0)
View SourceToken bucket rate limiter using ETS for lock-free per-client counters.
Provides connection-rate and message-rate limiting for MQTT servers.
Counters are stored in ETS with atomic update_counter operations,
safe for concurrent access from multiple transport handler processes.
The periodic counter reset runs as a supervised GenServer, ensuring the timer is restarted if it crashes.
Usage
limiter = MqttX.Server.RateLimiter.new(
max_connections: 100, # new connections per interval
max_messages: 1000, # messages per client per interval
interval: 1000 # window size in ms (default)
)
case MqttX.Server.RateLimiter.allow_connection?(limiter) do
:ok -> # accept connection
{:error, :rate_limited} -> # reject with 0x9F
end
case MqttX.Server.RateLimiter.allow_message?(limiter, client_id) do
:ok -> # process message
{:error, :rate_limited} -> # reject with 0x96
end
Summary
Functions
Check and increment the connection counter.
Check and increment the per-client message counter.
Clean up the rate limiter, deleting the ETS table and stopping the timer.
Create a new rate limiter.
Reset all counters. Called automatically on each interval tick.
Types
@type t() :: %{ table: :ets.table(), max_connections: pos_integer() | nil, max_messages: pos_integer() | nil, interval: pos_integer(), timer_pid: pid() | nil }
Functions
@spec allow_connection?(t()) :: :ok | {:error, :rate_limited}
Check and increment the connection counter.
Returns :ok if the connection is allowed, or {:error, :rate_limited}
if the connection rate limit has been exceeded.
Check and increment the per-client message counter.
Returns :ok if the message is allowed, or {:error, :rate_limited}
if the per-client message rate limit has been exceeded.
@spec cleanup(t()) :: :ok
Clean up the rate limiter, deleting the ETS table and stopping the timer.
Create a new rate limiter.
Options
:max_connections- Maximum new connections per interval (default: nil, unlimited):max_messages- Maximum messages per client per interval (default: nil, unlimited):interval- Window size in milliseconds (default: 1000)
@spec reset(t()) :: :ok
Reset all counters. Called automatically on each interval tick.