Adaptive concurrency limiter that dynamically adjusts its limit based on observed latency.
Unlike ExResilience.Bulkhead, which uses a fixed concurrency limit, this
module adjusts the limit up or down in response to measured call duration.
Callers beyond the current limit are rejected immediately (no wait queue),
keeping latency measurements clean.
Two algorithms are supported:
AIMD (Additive Increase, Multiplicative Decrease)
A simple sawtooth pattern. On each completed call:
- If the call succeeded and latency is below the threshold, the limit
increases by
:increase_by(additive). - If the call failed or latency exceeds the threshold, the limit is
multiplied by
:decrease_factor(multiplicative decrease).
Vegas
Inspired by TCP Vegas. Tracks a sliding window of RTT samples and estimates queue depth:
queue = limit * (1 - min_rtt / sample_rtt)- If
queue < alpha, the limit increases by 1 (under-utilized). - If
queue > beta, the limit decreases by 1 (overloaded). - Otherwise the limit holds steady.
Options
:name-- required. Registered name for this instance.:algorithm--:aimdor:vegas. Default:aimd.:initial_limit-- starting concurrency limit. Default10.:min_limit-- floor for the limit. Default1.:max_limit-- ceiling for the limit. Default200.
AIMD-specific options
:latency_threshold-- milliseconds above which a response counts as slow. Default100.:increase_by-- additive increase amount on fast success. Default1.:decrease_factor-- multiplicative factor applied on slow/failed calls. Default0.5.
Vegas-specific options
:alpha-- queue size threshold below which the limit increases. Default3.:beta-- queue size threshold above which the limit decreases. Default6.:window_size-- number of RTT samples kept in the sliding window. Default100.
Examples
iex> {:ok, _} = ExResilience.AdaptiveConcurrency.start_link(name: :ac_doc, algorithm: :aimd, initial_limit: 5)
iex> ExResilience.AdaptiveConcurrency.call(:ac_doc, fn -> :hello end)
{:ok, :hello}
Summary
Functions
Executes fun within the adaptive concurrency limit.
Returns a specification to start this module under a supervisor.
Returns the current concurrency limit.
Returns stats about the current state of the limiter.
Starts an adaptive concurrency limiter process.
Types
@type algorithm() :: :aimd | :vegas
@type option() :: {:name, atom()} | {:algorithm, algorithm()} | {:initial_limit, pos_integer()} | {:min_limit, pos_integer()} | {:max_limit, pos_integer()} | {:latency_threshold, pos_integer()} | {:increase_by, pos_integer()} | {:decrease_factor, float()} | {:alpha, pos_integer()} | {:beta, pos_integer()} | {:window_size, pos_integer()}
Functions
Executes fun within the adaptive concurrency limit.
Returns {:ok, result} on success, {:error, :concurrency_limited} when the
current limit is reached, or {:error, reason} if the function returns an
error tuple.
There is no wait queue. Callers beyond the limit are rejected immediately.
Examples
iex> {:ok, _} = ExResilience.AdaptiveConcurrency.start_link(name: :ac_call, initial_limit: 5)
iex> ExResilience.AdaptiveConcurrency.call(:ac_call, fn -> 42 end)
{:ok, 42}
iex> {:ok, _} = ExResilience.AdaptiveConcurrency.start_link(name: :ac_call_err, initial_limit: 5)
iex> ExResilience.AdaptiveConcurrency.call(:ac_call_err, fn -> {:error, :boom} end)
{:error, :boom}
Returns a specification to start this module under a supervisor.
See Supervisor.
@spec get_limit(atom()) :: pos_integer()
Returns the current concurrency limit.
Examples
iex> {:ok, _} = ExResilience.AdaptiveConcurrency.start_link(name: :ac_gl, initial_limit: 15)
iex> ExResilience.AdaptiveConcurrency.get_limit(:ac_gl)
15
@spec get_stats(atom()) :: %{ limit: pos_integer(), min_rtt: number() | nil, active: non_neg_integer() }
Returns stats about the current state of the limiter.
Returns a map with keys :limit, :min_rtt, and :active.
Examples
iex> {:ok, _} = ExResilience.AdaptiveConcurrency.start_link(name: :ac_gs, initial_limit: 8)
iex> stats = ExResilience.AdaptiveConcurrency.get_stats(:ac_gs)
iex> stats.limit
8
iex> stats.active
0
@spec start_link([option()]) :: GenServer.on_start()
Starts an adaptive concurrency limiter process.
See module docs for available options.
Examples
iex> {:ok, pid} = ExResilience.AdaptiveConcurrency.start_link(name: :ac_start, algorithm: :vegas, initial_limit: 10)
iex> is_pid(pid)
true