reckon_db_subscriptions (reckon_db v1.4.4)
View SourceSubscriptions API facade for reckon-db
Provides the public API for subscription operations: - subscribe: Create a new subscription - unsubscribe: Remove a subscription - get: Get a subscription by key - list: List all subscriptions - exists: Check if a subscription exists
Subscription types: - stream: Subscribe to all events in a specific stream - event_type: Subscribe to events of a specific type - event_pattern: Subscribe to events matching a pattern - event_payload: Subscribe to events with specific payload patterns
Summary
Functions
Acknowledge event delivery for a subscription
Check if a subscription exists
Get a subscription by key
List all subscriptions in the store
Setup subscription tracking for a process
Create a subscription with default options
Create a subscription with options
Remove a subscription by key
Remove a subscription by type, selector, and name
Types
-type event() :: #event{event_id :: binary(), event_type :: binary(), stream_id :: binary(), version :: non_neg_integer(), data :: map() | binary(), metadata :: map(), tags :: [binary()] | undefined, timestamp :: integer(), epoch_us :: integer(), data_content_type :: binary(), metadata_content_type :: binary()}.
-type subscribe_opts() :: #{subscription_name => binary(), pool_size => pos_integer(), start_from => non_neg_integer(), subscriber => pid()}.
-type subscription() :: #subscription{id :: binary(), type :: subscription_type(), selector :: binary() | map(), subscription_name :: binary(), subscriber_pid :: pid() | undefined, created_at :: integer(), pool_size :: pos_integer(), checkpoint :: non_neg_integer() | undefined, options :: map()}.
-type subscription_type() ::
stream | event_type | event_pattern | event_payload | tags | by_stream | by_event_type |
by_event_pattern | by_event_payload | by_tags.
Functions
-spec ack(atom(), binary(), binary() | undefined, non_neg_integer()) -> ok | {error, term()}.
Acknowledge event delivery for a subscription
Updates the checkpoint for the subscription to track progress. This is typically called after successfully processing an event. The checkpoint allows subscriptions to resume from where they left off after a restart.
Parameters: StoreId - The store identifier SubscriptionName - Name of the subscription StreamId - ID of the stream the event came from (may be undefined for cross-stream) EventNumber - Version/position of the acknowledged event
Returns ok on success, or {error, Reason} if the subscription is not found.
Check if a subscription exists
-spec get(atom(), binary()) -> {ok, subscription()} | {error, not_found}.
Get a subscription by key
-spec list(atom()) -> {ok, [subscription()]} | {error, term()}.
List all subscriptions in the store
Setup subscription tracking for a process
Joins the tracker group for subscriptions, allowing the process to receive notifications about subscription lifecycle events (created, updated, deleted).
The process will receive messages in the format: - {feature_created, subscriptions, Data} - {feature_updated, subscriptions, Data} - {feature_deleted, subscriptions, Data}
-spec subscribe(atom(), subscription_type(), binary() | map(), binary()) -> {ok, binary()} | {error, term()}.
Create a subscription with default options
Parameters: StoreId - The store identifier Type - Subscription type (stream, event_type, event_pattern, event_payload) Selector - The selector for matching events SubscriptionName - Human-readable name for the subscription
Returns {ok, SubscriptionKey} on success or {error, Reason} on failure.
-spec subscribe(atom(), subscription_type(), binary() | map(), binary(), subscribe_opts()) -> {ok, binary()} | {error, term()}.
Create a subscription with options
Options: - pool_size: Number of emitter workers (default: 1) - start_from: Starting position for replay (default: 0) - subscriber: PID to receive events directly (default: undefined)
Remove a subscription by key
-spec unsubscribe(atom(), subscription_type(), binary()) -> ok | {error, term()}.
Remove a subscription by type, selector, and name