reckon_db_backpressure (reckon_db v1.2.7)
View SourceBackpressure management for reckon-db subscriptions
Provides queue-based backpressure handling to prevent memory explosion when subscribers are slower than event producers.
Features: - Configurable queue size limits - Multiple overflow strategies (drop_oldest, drop_newest, block, error) - Pull mode for explicit demand - Warning thresholds with telemetry
Usage:
{ok, Queue} = reckon_db_backpressure:new(#{
max_queue => 1000,
strategy => drop_oldest,
warning_threshold => 800
}),
{ok, Queue2} = reckon_db_backpressure:enqueue(Queue, Event),
{ok, Events, Queue3} = reckon_db_backpressure:dequeue(Queue2, 10).
Summary
Functions
Add to demand (for pull mode).
Dequeue up to N events.
Dequeue all events.
Enqueue an event, applying backpressure strategy if full.
Enqueue multiple events.
Get current demand.
Get queue statistics.
Check if queue is empty.
Check if queue is at capacity.
Create a new backpressure queue.
Set demand (for pull mode).
Get current queue size.
Types
-type bp_opts() :: #{max_queue => pos_integer(), strategy => strategy(), mode => mode(), warning_threshold => pos_integer(), store_id => atom(), subscription_key => binary()}.
-type bp_queue() :: #bp_queue{queue :: queue:queue(event()), max_size :: pos_integer(), strategy :: strategy(), mode :: mode(), warning_threshold :: pos_integer(), demand :: non_neg_integer(), dropped :: non_neg_integer(), store_id :: atom(), subscription_key :: binary()}.
-type event() :: #event{event_id :: binary(), event_type :: binary(), stream_id :: binary(), version :: non_neg_integer(), data :: map() | binary(), metadata :: map(), tags :: [binary()] | undefined, timestamp :: integer(), epoch_us :: integer(), data_content_type :: binary(), metadata_content_type :: binary()}.
-type mode() :: push | pull.
-type strategy() :: drop_oldest | drop_newest | block | error.
Functions
-spec add_demand(bp_queue(), pos_integer()) -> bp_queue().
Add to demand (for pull mode).
-spec dequeue(bp_queue(), pos_integer()) -> {ok, [event()], bp_queue()}.
Dequeue up to N events.
Dequeue all events.
Enqueue an event, applying backpressure strategy if full.
Enqueue multiple events.
-spec get_demand(bp_queue()) -> non_neg_integer() | infinity.
Get current demand.
Get queue statistics.
Check if queue is empty.
Check if queue is at capacity.
Create a new backpressure queue.
-spec set_demand(bp_queue(), non_neg_integer()) -> bp_queue().
Set demand (for pull mode).
-spec size(bp_queue()) -> non_neg_integer().
Get current queue size.