reckon_db_streams (reckon_db v1.6.0)
View SourceStreams API facade for reckon-db
Provides the public API for stream operations: - append: Write events to a stream with optimistic concurrency - read: Read events from a stream - get_version: Get current stream version - exists: Check if stream exists - list_streams: List all streams in the store
Summary
Functions
Append events to a stream with expected version check
Delete a stream and all its events
Check if a stream exists
Get current version of a stream
Check if a store contains at least one event. Cannot rely on stream existence alone — streams can survive after all their events are deleted (truncation, GDPR erasure). Checks for actual event data by reading 1 event globally.
List all streams in the store
Read events from a stream
Read all events from a stream
Read all events across all streams in global epoch_us order.
Read all events of specific types from all streams using Khepri native filtering.
Read all events matching tags from all streams.
Types
-type direction() :: forward | backward.
-type event() :: #event{event_id :: binary(), event_type :: binary(), stream_id :: binary(), version :: non_neg_integer(), data :: map() | binary(), metadata :: map(), tags :: [binary()] | undefined, timestamp :: integer(), epoch_us :: integer(), data_content_type :: binary(), metadata_content_type :: binary()}.
Functions
-spec append(atom(), binary(), integer(), [new_event()]) -> {ok, non_neg_integer()} | {error, term()}.
Append events to a stream with expected version check
Expected version semantics: -1 (NO_STREAM) - Stream must not exist (first write) -2 (ANY_VERSION) - No version check, always append N >= 0 - Stream version must equal N
Returns {ok, NewVersion} on success or {error, Reason} on failure.
Delete a stream and all its events
Check if a stream exists
Get current version of a stream
Returns: -1 - if stream doesn't exist or is empty N >= 0 - representing the version of the latest event
Check if a store contains at least one event. Cannot rely on stream existence alone — streams can survive after all their events are deleted (truncation, GDPR erasure). Checks for actual event data by reading 1 event globally.
List all streams in the store
-spec read(atom(), binary(), non_neg_integer(), pos_integer(), direction()) -> {ok, [event()]} | {error, term()}.
Read events from a stream
Parameters: StoreId - The store identifier StreamId - The stream identifier StartVersion - Starting version (0-based) Count - Maximum number of events to read Direction - forward or backward
Returns {ok, [Event]} or {error, Reason}
Read all events from a stream
-spec read_all_global(atom(), non_neg_integer(), pos_integer()) -> {ok, [event()]} | {error, term()}.
Read all events across all streams in global epoch_us order.
Returns events sorted by epoch_us, skipping Offset events and returning up to BatchSize events. Used by catch-up subscriptions to replay historical events to a subscriber.
Parameters: StoreId - The store identifier Offset - Number of events to skip (0-based) BatchSize - Maximum number of events to return
Returns events sorted by epoch_us (global ordering).
-spec read_by_event_types(atom(), [binary()], pos_integer()) -> {ok, [event()]} | {error, term()}.
Read all events of specific types from all streams using Khepri native filtering.
This function uses Khepri's built-in #if_data_matches condition to filter events by type at the database level, avoiding loading all events into memory.
Parameters: StoreId - The store identifier EventTypes - List of event type binaries to match BatchSize - Maximum number of events to return (for pagination)
Returns events sorted by epoch_us (global ordering).
-spec read_by_tags(atom(), [binary()], any | all, pos_integer()) -> {ok, [event()]} | {error, term()}.
Read all events matching tags from all streams.
Tags provide a mechanism for cross-stream querying without affecting stream-based concurrency control. This is useful for the process-centric model where you want to find all events related to specific participants.
Match Modes
any (default): Returns events containing ANY of the specified tags (union). Example: read_by_tags(Store, [<<"student:456">>, <<"student:789">>], any, 100) Returns events for either student.
all: Returns events containing ALL of the specified tags (intersection). Example: read_by_tags(Store, [<<"student:456">>, <<"course:CS101">>], all, 100) Returns only events tagged with both student 456 AND course CS101.
Parameters
StoreId - The store identifier Tags - List of tag binaries to match Match - any | all (matching strategy) BatchSize - Maximum number of events to return
Returns
Events sorted by epoch_us (global ordering).