ZenWebsocket.RateLimiter (ZenWebsocket v0.4.2)

Copy Markdown View Source

Token bucket rate limiter for WebSocket API calls.

Prevents rate limit violations with configurable cost functions supporting credit-based (Deribit), weight-based (Binance), and simple rate limit (Coinbase) patterns through single algorithm.

Timer Ownership

The rate limiter schedules periodic refill timers using Process.send_after/3. These timers are sent to the process that calls init/1. The calling process must handle {:refill, name} messages by calling refill/1:

def handle_info({:refill, name}, state) do
  ZenWebsocket.RateLimiter.refill(name)
  {:noreply, state}
end

Memory Characteristics

Each rate limiter creates one named ETS table containing:

  • Configuration map (~200 bytes)
  • State with queue (grows with queued requests)
  • Token counter (8 bytes)

Cleanup: Call shutdown/1 when done to delete the ETS table. Tables are NOT automatically cleaned up on process termination.

API Functions

FunctionArityDescriptionParam Kinds
simple_cost1Fixed cost function returning 1 for every request.request: value
binance_cost1Calculate token cost for a Binance API request using weight-based pricing.request: value
deribit_cost1Calculate token cost for a Deribit API request using credit-based pricing.request: value
shutdown1Clean up rate limiter resources by deleting the ETS table.name: value
status1Get current rate limiter status with backpressure guidance.name: value
refill1Refill tokens at the configured rate.name: value
consume2Attempt to consume tokens for a request.name: value, request: value
init2Initialize a token bucket rate limiter with ETS storage.name: value, config: value

Summary

Functions

Binance weight-based cost function.

Attempts to consume tokens for a request.

Deribit credit-based cost function.

Initializes rate limiter with configuration.

Refills tokens at configured rate.

Cleans up rate limiter resources.

Simple cost function for fixed-rate exchanges.

Returns current token count, queue size, pressure level, and suggested delay.

Types

config()

@type config() :: %{
  optional(:max_queue_size) => pos_integer(),
  tokens: pos_integer(),
  refill_rate: pos_integer(),
  refill_interval: pos_integer(),
  request_cost: (term() -> pos_integer())
}

pressure_level()

@type pressure_level() :: :none | :low | :medium | :high

state()

@type state() :: %{
  tokens: non_neg_integer(),
  last_refill: integer(),
  queue: :queue.queue(),
  pressure_level: pressure_level()
}

Functions

binance_cost(map)

@spec binance_cost(map()) :: pos_integer()

Binance weight-based cost function.

consume(name, request)

@spec consume(atom(), term()) :: :ok | {:error, :rate_limited | :queue_full}

Attempts to consume tokens for a request.

Returns :ok if tokens available, queues request if not.

deribit_cost(map)

@spec deribit_cost(map()) :: pos_integer()

Deribit credit-based cost function.

init(name, config)

@spec init(atom(), config()) :: {:ok, atom()} | {:error, term()}

Initializes rate limiter with configuration.

Creates ETS table for state storage and schedules refill timer.

refill(name)

@spec refill(atom()) :: :ok

Refills tokens at configured rate.

Called by timer process at refill intervals.

shutdown(name)

@spec shutdown(atom()) :: :ok

Cleans up rate limiter resources.

Deletes the ETS table. Should be called when the rate limiter is no longer needed.

simple_cost(request)

@spec simple_cost(term()) :: pos_integer()

Simple cost function for fixed-rate exchanges.

status(name)

@spec status(atom()) ::
  {:ok,
   %{
     tokens: non_neg_integer(),
     queue_size: non_neg_integer(),
     pressure_level: pressure_level(),
     suggested_delay_ms: non_neg_integer()
   }}

Returns current token count, queue size, pressure level, and suggested delay.

The suggested_delay_ms provides backpressure guidance:

  • :high pressure (75%+) → refill_interval * 4
  • :medium pressure (50%+) → refill_interval * 2
  • :low pressure (25%+) → refill_interval
  • :none0