reckon_db_subscriptions (reckon_db v1.4.4)

View Source

Subscriptions API facade for reckon-db

Provides the public API for subscription operations: - subscribe: Create a new subscription - unsubscribe: Remove a subscription - get: Get a subscription by key - list: List all subscriptions - exists: Check if a subscription exists

Subscription types: - stream: Subscribe to all events in a specific stream - event_type: Subscribe to events of a specific type - event_pattern: Subscribe to events matching a pattern - event_payload: Subscribe to events with specific payload patterns

Summary

Functions

Acknowledge event delivery for a subscription

Check if a subscription exists

Get a subscription by key

List all subscriptions in the store

Setup subscription tracking for a process

Create a subscription with default options

Remove a subscription by key

Remove a subscription by type, selector, and name

Types

event/0

-type event() ::
          #event{event_id :: binary(),
                 event_type :: binary(),
                 stream_id :: binary(),
                 version :: non_neg_integer(),
                 data :: map() | binary(),
                 metadata :: map(),
                 tags :: [binary()] | undefined,
                 timestamp :: integer(),
                 epoch_us :: integer(),
                 data_content_type :: binary(),
                 metadata_content_type :: binary()}.

subscribe_opts/0

-type subscribe_opts() ::
          #{subscription_name => binary(),
            pool_size => pos_integer(),
            start_from => non_neg_integer(),
            subscriber => pid()}.

subscription/0

-type subscription() ::
          #subscription{id :: binary(),
                        type :: subscription_type(),
                        selector :: binary() | map(),
                        subscription_name :: binary(),
                        subscriber_pid :: pid() | undefined,
                        created_at :: integer(),
                        pool_size :: pos_integer(),
                        checkpoint :: non_neg_integer() | undefined,
                        options :: map()}.

subscription_type/0

-type subscription_type() ::
          stream | event_type | event_pattern | event_payload | tags | by_stream | by_event_type |
          by_event_pattern | by_event_payload | by_tags.

Functions

ack(StoreId, SubscriptionName, StreamId, EventNumber)

-spec ack(atom(), binary(), binary() | undefined, non_neg_integer()) -> ok | {error, term()}.

Acknowledge event delivery for a subscription

Updates the checkpoint for the subscription to track progress. This is typically called after successfully processing an event. The checkpoint allows subscriptions to resume from where they left off after a restart.

Parameters: StoreId - The store identifier SubscriptionName - Name of the subscription StreamId - ID of the stream the event came from (may be undefined for cross-stream) EventNumber - Version/position of the acknowledged event

Returns ok on success, or {error, Reason} if the subscription is not found.

exists(StoreId, Key)

-spec exists(atom(), binary()) -> boolean().

Check if a subscription exists

get(StoreId, Key)

-spec get(atom(), binary()) -> {ok, subscription()} | {error, not_found}.

Get a subscription by key

list(StoreId)

-spec list(atom()) -> {ok, [subscription()]} | {error, term()}.

List all subscriptions in the store

setup_tracking(StoreId, Pid)

-spec setup_tracking(atom(), pid()) -> ok.

Setup subscription tracking for a process

Joins the tracker group for subscriptions, allowing the process to receive notifications about subscription lifecycle events (created, updated, deleted).

The process will receive messages in the format: - {feature_created, subscriptions, Data} - {feature_updated, subscriptions, Data} - {feature_deleted, subscriptions, Data}

subscribe(StoreId, Type, Selector, SubscriptionName)

-spec subscribe(atom(), subscription_type(), binary() | map(), binary()) ->
                   {ok, binary()} | {error, term()}.

Create a subscription with default options

Parameters: StoreId - The store identifier Type - Subscription type (stream, event_type, event_pattern, event_payload) Selector - The selector for matching events SubscriptionName - Human-readable name for the subscription

Returns {ok, SubscriptionKey} on success or {error, Reason} on failure.

subscribe(StoreId, Type, Selector, SubscriptionName, Opts)

-spec subscribe(atom(), subscription_type(), binary() | map(), binary(), subscribe_opts()) ->
                   {ok, binary()} | {error, term()}.

Create a subscription with options

Options: - pool_size: Number of emitter workers (default: 1) - start_from: Starting position for replay (default: 0) - subscriber: PID to receive events directly (default: undefined)

unsubscribe(StoreId, Key)

-spec unsubscribe(atom(), binary()) -> ok | {error, term()}.

Remove a subscription by key

unsubscribe(StoreId, Type, SubscriptionName)

-spec unsubscribe(atom(), subscription_type(), binary()) -> ok | {error, term()}.

Remove a subscription by type, selector, and name