Shared Types

View Source

erl-esdb-gater defines common data structures used across the event sourcing ecosystem. These types provide a consistent interface between erl-esdb, erl-evoq, and adapter implementations.

Including the Types

-include_lib("erl_esdb_gater/include/esdb_gater_types.hrl").

Event Record

The #event{} record represents an immutable fact stored in an event stream.

-record(event, {
    event_id          :: binary(),           %% Unique identifier (UUID)
    event_type        :: binary(),           %% Type name (e.g., <<"OrderPlaced">>)
    stream_id         :: binary(),           %% Stream this event belongs to
    version           :: non_neg_integer(),  %% Position within stream (0-based)
    data              :: map() | binary(),   %% Event payload (Erlang term)
    metadata          :: map(),              %% Correlation, causation, user info
    timestamp         :: integer(),          %% Millisecond timestamp
    epoch_us          :: integer()           %% Microsecond epoch for ordering
}).

Usage (Client-Side)

%% Creating an event for append via gateway
Event = #{
    event_type => <<"UserCreated">>,
    data => #{user_id => <<"usr-123">>, email => <<"alice@example.com">>},
    metadata => #{correlation_id => <<"req-456">>}
},
{ok, Version} = esdb_gater_api:append_events(my_store, <<"user-usr-123">>, [Event]).

%% Reading events returns #event{} records
{ok, Events} = esdb_gater_api:stream_forward(my_store, <<"user-usr-123">>, 0, 100),
lists:foreach(fun(#event{event_type = Type, data = Data}) ->
    io:format("Event: ~s, Data: ~p~n", [Type, Data])
end, Events).

Snapshot Record

The #snapshot{} record stores aggregate state at a specific version for fast recovery.

-record(snapshot, {
    stream_id  :: binary(),           %% Stream/aggregate identifier
    version    :: non_neg_integer(),  %% Version at which snapshot was taken
    data       :: map() | binary(),   %% Aggregate state (Erlang term)
    metadata   :: map(),              %% Snapshot metadata
    timestamp  :: integer()           %% When snapshot was created
}).

Usage (Client-Side)

%% Save a snapshot via gateway
State = #{balance => 1000, status => active},
ok = esdb_gater_api:record_snapshot(my_store, my_source, <<"account-123">>, 50, State).

%% Load latest snapshot
case esdb_gater_api:read_snapshot(my_store, my_source, <<"account-123">>, latest) of
    {ok, #snapshot{version = V, data = State}} ->
        %% Replay events from version V onwards
        {ok, Events} = esdb_gater_api:stream_forward(my_store, <<"account-123">>, V, 1000),
        FinalState = lists:foldl(fun apply_event/2, State, Events);
    {error, not_found} ->
        %% No snapshot, replay all events
        rebuild_from_scratch(my_store, <<"account-123">>)
end.

Subscription Record

The #subscription{} record tracks subscription state for event delivery.

-record(subscription, {
    id                :: binary(),            %% Unique subscription ID
    type              :: subscription_type(), %% stream | event_type | event_pattern | event_payload
    selector          :: binary() | map(),    %% What to match
    subscription_name :: binary(),            %% Human-readable name
    subscriber_pid    :: pid() | undefined,   %% Process receiving events
    created_at        :: integer(),           %% Creation timestamp
    pool_size         :: pos_integer(),       %% Emitter pool size
    checkpoint        :: non_neg_integer() | undefined,  %% Last processed position
    options           :: map()                %% Additional options
}).

-type subscription_type() :: stream | event_type | event_pattern | event_payload.

Subscription Types

TypeSelectorDescription
streamStream ID binaryEvents from a specific stream
event_typeEvent type binaryEvents of a specific type across all streams
event_patternPattern binaryEvents matching a wildcard pattern
event_payloadMatch mapEvents with matching payload fields

Usage (Client-Side)

%% Create a subscription via gateway
ok = esdb_gater_api:save_subscription(
    my_store,
    stream,                    %% Type
    <<"orders-*">>,            %% Selector (pattern)
    <<"order_projection">>,    %% Name
    self(),                    %% Subscriber PID
    #{}                        %% Options
).

%% List subscriptions
{ok, Subscriptions} = esdb_gater_api:get_subscriptions(my_store).

%% Remove a subscription
ok = esdb_gater_api:remove_subscription(my_store, stream, <<"orders-*">>, <<"order_projection">>).

Version Constants

These constants control optimistic concurrency behavior:

ConstantValueBehavior
?NO_STREAM-1Fails if stream exists
?ANY_VERSION-2Always appends
?STREAM_EXISTS-4Fails if stream doesn't exist
N >= 0NFails if current version != N

Usage with the gateway API:

%% Append with version check via options
{ok, Version} = esdb_gater_api:append_events(
    my_store,
    <<"order-123">>,
    [Event],
    #{expected_version => 4}  %% Fails if current version != 4
).

Append Result Record

The #append_result{} record provides details about a successful append operation.

-record(append_result, {
    version  :: non_neg_integer(),            %% New stream version
    position :: non_neg_integer() | undefined, %% Global position (if applicable)
    count    :: non_neg_integer()              %% Number of events appended
}).

Error Types

-type append_error() ::
    {wrong_expected_version, Expected :: integer(), Actual :: integer()} |
    {stream_deleted, StreamId :: binary()} |
    {timeout, Reason :: term()} |
    {error, Reason :: term()}.

-type read_error() ::
    {stream_not_found, StreamId :: binary()} |
    {timeout, Reason :: term()} |
    {error, Reason :: term()}.

Ecosystem Usage

These types are used by:

  • erl-esdb: Core event store implementation (server-side)
  • erl-esdb-gater: Gateway API for distributed access (client-side)
  • erl-evoq: CQRS/Event Sourcing framework
  • erl-evoq-esdb: Adapter connecting erl-evoq to erl-esdb

By depending on erl-esdb-gater for types, higher-level libraries avoid direct coupling to the core event store implementation.

See Also