reckon_db_subscriptions_store (reckon_db v1.4.4)

View Source

Subscriptions store for reckon-db

Manages subscription persistence and retrieval directly via Khepri. This is a facade module that provides direct access to the subscription storage without going through a gen_server, since Khepri/Ra handles concurrency internally.

Summary

Functions

Delete a subscription by key

Check if a subscription exists by key

Check if a subscription exists by record

Find a subscription by name

Get a subscription by key

Generate a unique key for a subscription

Generate a unique key for a subscription from components

List all subscriptions in the store

Store a subscription

Update the checkpoint for a subscription

Types

store_id/0

-type store_id() :: atom().

subscription/0

-type subscription() ::
          #subscription{id :: binary(),
                        type :: subscription_type(),
                        selector :: binary() | map(),
                        subscription_name :: binary(),
                        subscriber_pid :: pid() | undefined,
                        created_at :: integer(),
                        pool_size :: pos_integer(),
                        checkpoint :: non_neg_integer() | undefined,
                        options :: map()}.

subscription_type/0

-type subscription_type() ::
          stream | event_type | event_pattern | event_payload | tags | by_stream | by_event_type |
          by_event_pattern | by_event_payload | by_tags.

Functions

delete(StoreId, Key)

-spec delete(store_id(), binary()) -> ok | {error, term()}.

Delete a subscription by key

exists(StoreId, Key)

-spec exists(store_id(), binary()) -> boolean().

Check if a subscription exists by key

exists(StoreId, Type, SubscriptionName)

-spec exists(store_id(), subscription_type(), binary()) -> boolean().

Check if a subscription exists by record

find_by_name(StoreId, SubscriptionName)

-spec find_by_name(store_id(), binary()) -> {ok, binary(), subscription()} | {error, not_found}.

Find a subscription by name

Searches all subscriptions for one matching the given name. Returns the subscription key and record if found.

get(StoreId, Key)

-spec get(store_id(), binary()) -> subscription() | undefined.

Get a subscription by key

key(Subscription)

-spec key(subscription()) -> binary().

Generate a unique key for a subscription

The key is a phash2 hash of {type, selector, subscription_name}

key(Type, Selector, SubscriptionName)

-spec key(subscription_type(), binary() | map(), binary()) -> binary().

Generate a unique key for a subscription from components

list(StoreId)

-spec list(store_id()) -> {ok, [subscription()]} | {error, term()}.

List all subscriptions in the store

put(StoreId, Subscription)

-spec put(store_id(), subscription()) -> ok | {error, term()}.

Store a subscription

update_checkpoint(StoreId, Key, Position)

-spec update_checkpoint(store_id(), binary(), non_neg_integer()) -> ok | {error, term()}.

Update the checkpoint for a subscription

Parameters: StoreId - The store identifier Key - The subscription key Position - The new checkpoint position