MqttX.Server.RateLimiter (MqttX v0.7.0)

View Source

Token bucket rate limiter using ETS for lock-free per-client counters.

Provides connection-rate and message-rate limiting for MQTT servers. Counters are stored in ETS with atomic update_counter operations, safe for concurrent access from multiple transport handler processes.

The periodic counter reset runs as a supervised GenServer, ensuring the timer is restarted if it crashes.

Usage

limiter = MqttX.Server.RateLimiter.new(
  max_connections: 100,    # new connections per interval
  max_messages: 1000,      # messages per client per interval
  interval: 1000           # window size in ms (default)
)

case MqttX.Server.RateLimiter.allow_connection?(limiter) do
  :ok -> # accept connection
  {:error, :rate_limited} -> # reject with 0x9F
end

case MqttX.Server.RateLimiter.allow_message?(limiter, client_id) do
  :ok -> # process message
  {:error, :rate_limited} -> # reject with 0x96
end

Summary

Functions

Check and increment the connection counter.

Check and increment the per-client message counter.

Clean up the rate limiter, deleting the ETS table and stopping the timer.

Create a new rate limiter.

Reset all counters. Called automatically on each interval tick.

Types

t()

@type t() :: %{
  table: :ets.table(),
  max_connections: pos_integer() | nil,
  max_messages: pos_integer() | nil,
  interval: pos_integer(),
  timer_pid: pid() | nil
}

Functions

allow_connection?(map)

@spec allow_connection?(t()) :: :ok | {:error, :rate_limited}

Check and increment the connection counter.

Returns :ok if the connection is allowed, or {:error, :rate_limited} if the connection rate limit has been exceeded.

allow_message?(map, client_id)

@spec allow_message?(t(), binary()) :: :ok | {:error, :rate_limited}

Check and increment the per-client message counter.

Returns :ok if the message is allowed, or {:error, :rate_limited} if the per-client message rate limit has been exceeded.

cleanup(map)

@spec cleanup(t()) :: :ok

Clean up the rate limiter, deleting the ETS table and stopping the timer.

new(opts \\ [])

@spec new(keyword()) :: t()

Create a new rate limiter.

Options

  • :max_connections - Maximum new connections per interval (default: nil, unlimited)
  • :max_messages - Maximum messages per client per interval (default: nil, unlimited)
  • :interval - Window size in milliseconds (default: 1000)

reset(map)

@spec reset(t()) :: :ok

Reset all counters. Called automatically on each interval tick.