reckon_db_streams (reckon_db v1.6.0)

View Source

Streams API facade for reckon-db

Provides the public API for stream operations: - append: Write events to a stream with optimistic concurrency - read: Read events from a stream - get_version: Get current stream version - exists: Check if stream exists - list_streams: List all streams in the store

Summary

Functions

Append events to a stream with expected version check

Delete a stream and all its events

Check if a stream exists

Get current version of a stream

Check if a store contains at least one event. Cannot rely on stream existence alone — streams can survive after all their events are deleted (truncation, GDPR erasure). Checks for actual event data by reading 1 event globally.

List all streams in the store

Read all events across all streams in global epoch_us order.

Read all events of specific types from all streams using Khepri native filtering.

Read all events matching tags from all streams.

Types

direction/0

-type direction() :: forward | backward.

event/0

-type event() ::
          #event{event_id :: binary(),
                 event_type :: binary(),
                 stream_id :: binary(),
                 version :: non_neg_integer(),
                 data :: map() | binary(),
                 metadata :: map(),
                 tags :: [binary()] | undefined,
                 timestamp :: integer(),
                 epoch_us :: integer(),
                 data_content_type :: binary(),
                 metadata_content_type :: binary()}.

new_event/0

-type new_event() ::
          #{event_type := binary(),
            data := map() | binary(),
            metadata => map(),
            tags => [binary()],
            event_id => binary()}.

Functions

append(StoreId, StreamId, ExpectedVersion, Events)

-spec append(atom(), binary(), integer(), [new_event()]) -> {ok, non_neg_integer()} | {error, term()}.

Append events to a stream with expected version check

Expected version semantics: -1 (NO_STREAM) - Stream must not exist (first write) -2 (ANY_VERSION) - No version check, always append N >= 0 - Stream version must equal N

Returns {ok, NewVersion} on success or {error, Reason} on failure.

append(StoreId, StreamId, ExpectedVersion, Events, Opts)

-spec append(atom(), binary(), integer(), [new_event()], map()) ->
                {ok, non_neg_integer()} | {error, term()}.

delete(StoreId, StreamId)

-spec delete(atom(), binary()) -> ok | {error, term()}.

Delete a stream and all its events

exists(StoreId, StreamId)

-spec exists(atom(), binary()) -> boolean().

Check if a stream exists

get_version(StoreId, StreamId)

-spec get_version(atom(), binary()) -> integer().

Get current version of a stream

Returns: -1 - if stream doesn't exist or is empty N >= 0 - representing the version of the latest event

has_events(StoreId)

-spec has_events(atom()) -> boolean().

Check if a store contains at least one event. Cannot rely on stream existence alone — streams can survive after all their events are deleted (truncation, GDPR erasure). Checks for actual event data by reading 1 event globally.

list_streams(StoreId)

-spec list_streams(atom()) -> {ok, [binary()]} | {error, term()}.

List all streams in the store

read(StoreId, StreamId, StartVersion, Count, Direction)

-spec read(atom(), binary(), non_neg_integer(), pos_integer(), direction()) ->
              {ok, [event()]} | {error, term()}.

Read events from a stream

Parameters: StoreId - The store identifier StreamId - The stream identifier StartVersion - Starting version (0-based) Count - Maximum number of events to read Direction - forward or backward

Returns {ok, [Event]} or {error, Reason}

read_all(StoreId, StreamId, BatchSize, Direction)

-spec read_all(atom(), binary(), pos_integer(), direction()) -> {ok, [event()]} | {error, term()}.

Read all events from a stream

read_all_global(StoreId, Offset, BatchSize)

-spec read_all_global(atom(), non_neg_integer(), pos_integer()) -> {ok, [event()]} | {error, term()}.

Read all events across all streams in global epoch_us order.

Returns events sorted by epoch_us, skipping Offset events and returning up to BatchSize events. Used by catch-up subscriptions to replay historical events to a subscriber.

Parameters: StoreId - The store identifier Offset - Number of events to skip (0-based) BatchSize - Maximum number of events to return

Returns events sorted by epoch_us (global ordering).

read_by_event_types(StoreId, EventTypes, BatchSize)

-spec read_by_event_types(atom(), [binary()], pos_integer()) -> {ok, [event()]} | {error, term()}.

Read all events of specific types from all streams using Khepri native filtering.

This function uses Khepri's built-in #if_data_matches condition to filter events by type at the database level, avoiding loading all events into memory.

Parameters: StoreId - The store identifier EventTypes - List of event type binaries to match BatchSize - Maximum number of events to return (for pagination)

Returns events sorted by epoch_us (global ordering).

read_by_tags(StoreId, Tags, Match, BatchSize)

-spec read_by_tags(atom(), [binary()], any | all, pos_integer()) -> {ok, [event()]} | {error, term()}.

Read all events matching tags from all streams.

Tags provide a mechanism for cross-stream querying without affecting stream-based concurrency control. This is useful for the process-centric model where you want to find all events related to specific participants.

Match Modes

any (default): Returns events containing ANY of the specified tags (union). Example: read_by_tags(Store, [<<"student:456">>, <<"student:789">>], any, 100) Returns events for either student.

all: Returns events containing ALL of the specified tags (intersection). Example: read_by_tags(Store, [<<"student:456">>, <<"course:CS101">>], all, 100) Returns only events tagged with both student 456 AND course CS101.

Parameters

StoreId - The store identifier Tags - List of tag binaries to match Match - any | all (matching strategy) BatchSize - Maximum number of events to return

Returns

Events sorted by epoch_us (global ordering).