evoq_event_store (evoq v1.14.1)

View Source

Wrapper for event store operations via adapter.

Provides a consistent interface for event store operations, delegating to a configured adapter.

Configuration (Required)

You must configure an adapter in your application config:

  {evoq, [
      {event_store_adapter, evoq_esdb_gater_adapter}
  ]}

Available adapters: - evoq_esdb_gater_adapter (from reckon_evoq package)

Summary

Functions

Check if a stream exists.

Get the configured event store adapter. Crashes if no adapter is configured.

Check if a store contains at least one event.

List all streams in the store.

Read all events from a stream.

Read all events from a stream with batch size.

Read all events from all streams, sorted by global position. This is useful for projection rebuild.

Read all events across all streams in global order.

Read all events of specific types from all streams.

Set the event store adapter (primarily for testing).

Get the current version of a stream.

Types

evoq_event/0

-type evoq_event() ::
          #evoq_event{event_id :: binary(),
                      event_type :: binary(),
                      stream_id :: binary(),
                      version :: non_neg_integer(),
                      data :: map() | binary(),
                      metadata :: map(),
                      tags :: [binary()] | undefined,
                      timestamp :: integer(),
                      epoch_us :: integer(),
                      data_content_type :: binary(),
                      metadata_content_type :: binary()}.

Functions

append(StoreId, StreamId, ExpectedVersion, Events)

-spec append(atom(), binary(), integer(), [map()]) -> {ok, non_neg_integer()} | {error, term()}.

Append events to a stream.

exists(StoreId, StreamId)

-spec exists(atom(), binary()) -> boolean().

Check if a stream exists.

get_adapter()

-spec get_adapter() -> module().

Get the configured event store adapter. Crashes if no adapter is configured.

has_events(StoreId)

-spec has_events(atom()) -> boolean().

Check if a store contains at least one event.

list_streams(StoreId)

-spec list_streams(atom()) -> {ok, [binary()]} | {error, term()}.

List all streams in the store.

read(StoreId, StreamId, FromVersion, Count, Direction)

-spec read(atom(), binary(), non_neg_integer(), pos_integer(), forward | backward) ->
              {ok, [map()]} | {error, term()}.

Read events from a stream.

read_all(StoreId, StreamId, Direction)

-spec read_all(atom(), binary(), forward | backward) -> {ok, [map()]} | {error, term()}.

Read all events from a stream.

read_all(StoreId, StreamId, BatchSize, Direction)

-spec read_all(atom(), binary(), pos_integer(), forward | backward) -> {ok, [map()]} | {error, term()}.

Read all events from a stream with batch size.

read_all_events(StoreId, BatchSize)

-spec read_all_events(atom(), pos_integer()) -> {ok, [map()]} | {error, term()}.

Read all events from all streams, sorted by global position. This is useful for projection rebuild.

read_all_global(StoreId, Offset, BatchSize)

-spec read_all_global(atom(), non_neg_integer(), pos_integer()) ->
                         {ok, [evoq_event()]} | {error, term()}.

Read all events across all streams in global order.

Returns events sorted by epoch_us, starting from Offset. Used for catch-up subscriptions and global event replay. Falls back to read_all_events/2 if adapter does not implement the optional read_all_global/3 callback.

read_events_by_types(StoreId, EventTypes, BatchSize)

-spec read_events_by_types(atom(), [binary()], pos_integer()) -> {ok, [map()]} | {error, term()}.

Read all events of specific types from all streams.

Routes through the adapter which uses native filtering when available. Returns events sorted by epoch_us (global ordering).

set_adapter(Adapter)

-spec set_adapter(module()) -> ok.

Set the event store adapter (primarily for testing).

version(StoreId, StreamId)

-spec version(atom(), binary()) -> integer().

Get the current version of a stream.