reckon_db_backpressure (reckon_db v1.6.0)

View Source

Backpressure management for reckon-db subscriptions

Provides queue-based backpressure handling to prevent memory explosion when subscribers are slower than event producers.

Features: - Configurable queue size limits - Multiple overflow strategies (drop_oldest, drop_newest, block, error) - Pull mode for explicit demand - Warning thresholds with telemetry

Usage:

  {ok, Queue} = reckon_db_backpressure:new(#{
      max_queue => 1000,
      strategy => drop_oldest,
      warning_threshold => 800
  }),
  {ok, Queue2} = reckon_db_backpressure:enqueue(Queue, Event),
  {ok, Events, Queue3} = reckon_db_backpressure:dequeue(Queue2, 10).

Summary

Functions

Add to demand (for pull mode).

Dequeue up to N events.

Dequeue all events.

Enqueue an event, applying backpressure strategy if full.

Enqueue multiple events.

Get current demand.

Get queue statistics.

Check if queue is empty.

Check if queue is at capacity.

Create a new backpressure queue.

Set demand (for pull mode).

Get current queue size.

Types

bp_opts/0

-type bp_opts() ::
          #{max_queue => pos_integer(),
            strategy => strategy(),
            mode => mode(),
            warning_threshold => pos_integer(),
            store_id => atom(),
            subscription_key => binary()}.

bp_queue/0

-type bp_queue() ::
          #bp_queue{queue :: queue:queue(event()),
                    max_size :: pos_integer(),
                    strategy :: strategy(),
                    mode :: mode(),
                    warning_threshold :: pos_integer(),
                    demand :: non_neg_integer(),
                    dropped :: non_neg_integer(),
                    store_id :: atom(),
                    subscription_key :: binary()}.

event/0

-type event() ::
          #event{event_id :: binary(),
                 event_type :: binary(),
                 stream_id :: binary(),
                 version :: non_neg_integer(),
                 data :: map() | binary(),
                 metadata :: map(),
                 tags :: [binary()] | undefined,
                 timestamp :: integer(),
                 epoch_us :: integer(),
                 data_content_type :: binary(),
                 metadata_content_type :: binary()}.

mode/0

-type mode() :: push | pull.

strategy/0

-type strategy() :: drop_oldest | drop_newest | block | error.

Functions

add_demand(Bp_queue, Count)

-spec add_demand(bp_queue(), pos_integer()) -> bp_queue().

Add to demand (for pull mode).

dequeue(Queue, Count)

-spec dequeue(bp_queue(), pos_integer()) -> {ok, [event()], bp_queue()}.

Dequeue up to N events.

dequeue_all(Queue)

-spec dequeue_all(bp_queue()) -> {ok, [event()], bp_queue()}.

Dequeue all events.

enqueue(Queue, Event)

-spec enqueue(bp_queue(), event()) -> {ok, bp_queue()} | {error, queue_full | blocked}.

Enqueue an event, applying backpressure strategy if full.

enqueue_many(Queue, Rest)

-spec enqueue_many(bp_queue(), [event()]) -> {ok, bp_queue()} | {error, term()}.

Enqueue multiple events.

get_demand(Bp_queue)

-spec get_demand(bp_queue()) -> non_neg_integer() | infinity.

Get current demand.

info(Bp_queue)

-spec info(bp_queue()) -> map().

Get queue statistics.

is_empty(Bp_queue)

-spec is_empty(bp_queue()) -> boolean().

Check if queue is empty.

is_full(Bp_queue)

-spec is_full(bp_queue()) -> boolean().

Check if queue is at capacity.

new(Opts)

-spec new(bp_opts()) -> {ok, bp_queue()}.

Create a new backpressure queue.

set_demand(Queue, Demand)

-spec set_demand(bp_queue(), non_neg_integer()) -> bp_queue().

Set demand (for pull mode).

size(Bp_queue)

-spec size(bp_queue()) -> non_neg_integer().

Get current queue size.