erl-esdb

View Source

BEAM-native Event Store built on Khepri/Ra with Raft consensus.

Architecture

Overview

erl-esdb is an Erlang implementation of a distributed event store designed for:

  • Event Sourcing: Store and replay events with optimistic concurrency
  • Clustering: Automatic node discovery and Raft-based replication
  • High Throughput: Partitioned writers for concurrent stream writes
  • Edge & Datacenter: Works on Nerves devices and Kubernetes clusters

Features

  • Event stream CRUD with versioning and optimistic concurrency
  • Persistent subscriptions (stream, event type, pattern, payload matching)
  • Snapshot management for aggregate state
  • Emitter pools for high-throughput event delivery
  • UDP multicast and Kubernetes DNS discovery
  • BEAM telemetry with optional OpenTelemetry exporters

Installation

Community Edition (hex.pm)

Add to your rebar.config:

{deps, [
    {erl_esdb, "0.4.0"}
]}.

Pure Erlang implementation - works everywhere, no native dependencies.

Enterprise Edition (optional NIF acceleration)

For NIF-accelerated performance (5-100x faster for specific operations), add the erl_esdb_nifs package:

{deps, [
    {erl_esdb, "0.4.0"},
    {erl_esdb_nifs, {git, "git@github.com:macula-io/erl-esdb-nifs.git", {tag, "0.4.0"}}}
]}.

Requires Rust toolchain: curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh

Enterprise NIFs provide acceleration for:

  • Crypto: Ed25519 signature verification, SHA256 (~5x faster)
  • Archive: LZ4/Zstd compression/decompression (~10x faster)
  • Hash: xxHash3, FNV-1a for consistent partitioning (SIMD optimized)
  • Aggregate: Vectorized event folding with tagged values
  • Filter: Pre-compiled predicates for event filtering
  • Graph: Causation graph analysis via petgraph (topological sort, cycle detection)

When NIFs are unavailable, pure Erlang fallbacks are used automatically.

Quick Start

%% Start the application
application:ensure_all_started(erl_esdb).

%% Append events to a stream
Events = [
    #{
        event_type => <<"user_created">>,
        data => #{name => <<"Alice">>, email => <<"alice@example.com">>},
        metadata => #{correlation_id => <<"req-123">>}
    }
],
{ok, Version} = erl_esdb_streams:append(my_store, <<"user-123">>, -1, Events).

%% Read events from a stream
{ok, ReadEvents} = erl_esdb_streams:read(my_store, <<"user-123">>, 0, 100, forward).

%% Subscribe to events
{ok, SubKey} = erl_esdb_subscriptions:subscribe(
    my_store,
    stream,                    %% Type: stream | event_type | event_pattern | event_payload
    <<"user-123">>,            %% Selector
    <<"user_projection">>      %% Subscription name
).

%% Receive events
receive
    {event, Event} -> io:format("Received: ~p~n", [Event])
end.

API Reference

Streams

%% Append events (returns new version)
erl_esdb_streams:append(StoreId, StreamId, ExpectedVersion, Events) ->
    {ok, NewVersion} | {error, version_mismatch | term()}.

%% Read events from a stream
erl_esdb_streams:read(StoreId, StreamId, FromVersion, Count, Direction) ->
    {ok, [Event]} | {error, stream_not_found | term()}.

%% Read across all streams
erl_esdb_streams:read_all(StoreId, FromVersion, Count, Direction) ->
    {ok, [Event]} | {error, term()}.

%% Read events by type
erl_esdb_streams:read_by_event_types(StoreId, EventTypes, Opts) ->
    {ok, [Event]} | {error, term()}.

%% Get stream version
erl_esdb_streams:get_version(StoreId, StreamId) -> {ok, Version} | {error, term()}.

%% Check if stream exists
erl_esdb_streams:exists(StoreId, StreamId) -> boolean().

%% List all streams
erl_esdb_streams:list_streams(StoreId) -> {ok, [StreamId]} | {error, term()}.

%% Delete stream (soft delete)
erl_esdb_streams:delete(StoreId, StreamId) -> ok | {error, term()}.

Subscriptions

%% Create subscription
erl_esdb_subscriptions:subscribe(StoreId, Type, Selector, Name) ->
    {ok, SubscriptionKey} | {error, term()}.
erl_esdb_subscriptions:subscribe(StoreId, Type, Selector, Name, Opts) ->
    {ok, SubscriptionKey} | {error, term()}.

%% Remove subscription (by key or by type+name)
erl_esdb_subscriptions:unsubscribe(StoreId, SubscriptionKey) -> ok | {error, term()}.
erl_esdb_subscriptions:unsubscribe(StoreId, Type, SubscriptionName) -> ok | {error, term()}.

%% Get subscription by key
erl_esdb_subscriptions:get(StoreId, SubscriptionKey) ->
    {ok, Subscription} | {error, not_found}.

%% Acknowledge event processing
erl_esdb_subscriptions:ack(StoreId, StreamId, SubscriptionName, EventNumber) -> ok.

%% List subscriptions
erl_esdb_subscriptions:list(StoreId) -> {ok, [Subscription]}.

%% Check if subscription exists
erl_esdb_subscriptions:exists(StoreId, SubscriptionKey) -> boolean().

%% Subscription types:
%%   stream - Events from a specific stream
%%   event_type - Events matching event type
%%   event_pattern - Events matching stream pattern (wildcards)
%%   event_payload - Events matching payload criteria

Snapshots

%% Save snapshot
erl_esdb_snapshots:save(StoreId, StreamId, Version, Data) -> ok.
erl_esdb_snapshots:save(StoreId, StreamId, Version, Data, Metadata) -> ok.

%% Load latest snapshot
erl_esdb_snapshots:load(StoreId, StreamId) -> {ok, Snapshot} | {error, not_found}.

%% Load snapshot at specific version
erl_esdb_snapshots:load_at(StoreId, StreamId, Version) -> {ok, Snapshot} | {error, not_found}.

%% List all snapshots for stream
erl_esdb_snapshots:list(StoreId, StreamId) -> {ok, [Snapshot]}.

%% Delete all snapshots for stream
erl_esdb_snapshots:delete(StoreId, StreamId) -> ok.

%% Delete snapshot at specific version
erl_esdb_snapshots:delete_at(StoreId, StreamId, Version) -> ok.

%% Check if snapshot exists
erl_esdb_snapshots:exists(StoreId, StreamId) -> boolean().
erl_esdb_snapshots:exists_at(StoreId, StreamId, Version) -> boolean().

Aggregation

%% Fold events left to right (chronological order)
%% Returns a tagged_map with {sum, N} and {overwrite, V} tags preserved
erl_esdb_aggregator:foldl(Events) -> tagged_map().
erl_esdb_aggregator:foldl(Events, InitialState) -> tagged_map().

%% Fold events right to left (reverse order)
erl_esdb_aggregator:foldr(Events) -> tagged_map().
erl_esdb_aggregator:foldr(Events, InitialState) -> tagged_map().

%% Finalize a tagged map (unwrap {sum, N} -> N, {overwrite, V} -> V)
erl_esdb_aggregator:finalize(TaggedMap) -> map().

%% Aggregate events with optional snapshot (convenience function)
erl_esdb_aggregator:aggregate(Events, Snapshot | undefined, Opts) -> map().
%% Opts: #{initial_state => map(), finalize => boolean()}

Example usage:

%% Load events and aggregate
{ok, Events} = erl_esdb_streams:read(my_store, <<"account-123">>, 0, 10000, forward),
TaggedState = erl_esdb_aggregator:foldl(Events, #{balance => {sum, 0}}),
FinalState = erl_esdb_aggregator:finalize(TaggedState).

%% Or use aggregate/3 with snapshot support
{ok, Snapshot} = erl_esdb_snapshots:load(my_store, <<"account-123">>),
{ok, NewEvents} = erl_esdb_streams:read(my_store, <<"account-123">>, Snapshot#snapshot.version + 1, 10000, forward),
State = erl_esdb_aggregator:aggregate(NewEvents, Snapshot, #{}).

Telemetry

%% Attach default logger handler
erl_esdb_telemetry:attach_default_handler() -> ok.

%% Attach custom handler
erl_esdb_telemetry:attach(HandlerId, HandlerFun, Config) -> ok.

%% Detach handler
erl_esdb_telemetry:detach(HandlerId) -> ok.

Configuration

%% sys.config
[{erl_esdb, [
    {stores, [
        {my_store, [
            {data_dir, "/var/lib/erl_esdb/my_store"},
            {mode, cluster},  %% single | cluster
            {timeout, 5000}
        ]}
    ]},
    {telemetry_handlers, [logger]},
    {writer_pool_size, 10},
    {reader_pool_size, 10},

    %% Cluster discovery (cluster mode only)
    {discovery, [
        {method, multicast},  %% multicast | k8s_dns
        {port, 45892},
        {multicast_addr, {239, 255, 0, 1}},
        {secret, <<"cluster_secret">>}
    ]}
]}].

Architecture

Supervision Tree

Supervision Tree

Event Flow

Event Flow

Telemetry Events

EventMeasurementsMetadata
[erl_esdb, stream, write, start]system_timestore_id, stream_id, event_count
[erl_esdb, stream, write, stop]duration, event_countstore_id, stream_id, new_version
[erl_esdb, stream, write, error]durationstore_id, stream_id, reason
[erl_esdb, stream, read, start]system_timestore_id, stream_id
[erl_esdb, stream, read, stop]duration, event_countstore_id, stream_id
[erl_esdb, subscription, created]system_timestore_id, subscription_id, type
[erl_esdb, subscription, deleted]system_timestore_id, subscription_id
[erl_esdb, snapshot, created]duration, size_bytesstore_id, stream_id, version
[erl_esdb, cluster, node, up]system_timestore_id, node, member_count
[erl_esdb, cluster, node, down]system_timestore_id, node, reason
[erl_esdb, cluster, leader, elected]system_timestore_id, leader

Building

rebar3 compile         # Compile
rebar3 eunit           # Unit tests
rebar3 ct              # Integration tests
rebar3 dialyzer        # Type checking
rebar3 cover           # Coverage report

Testing

Test counts:

  • Unit tests: 446 tests (including NIF modules with enterprise/community equivalence tests)
  • Integration tests: 53 tests (streams, subscriptions, snapshots, cluster)
  • End-to-end tests: 24 tests (full gater integration)
rebar3 eunit --dir=test/unit              # All unit tests
rebar3 ct --dir=test/integration          # Integration tests
rebar3 ct --dir=test/e2e                  # E2E tests with gater
rebar3 ct --suite=erl_esdb_streams_SUITE  # Streams tests
rebar3 ct --suite=erl_esdb_cluster_SUITE  # Cluster tests

Gateway API

erl-esdb is accessed through erl-esdb-gater, which provides the unified API for load-balanced, distributed access to event stores.

How It Works

  1. erl-esdb starts and creates a gateway worker for each store
  2. Gateway workers register with the erl-esdb-gater pg-based registry
  3. Clients use the gater API for all event store operations
  4. The gater routes requests to registered workers using round-robin with failover

Architecture

Gateway Architecture

Using the Gateway API

All event store operations go through the gater API:

%% Stream operations
{ok, Version} = esdb_gater_api:append_events(my_store, StreamId, Events).
{ok, Events} = esdb_gater_api:stream_forward(my_store, StreamId, 0, 100).
{ok, Version} = esdb_gater_api:get_version(my_store, StreamId).

%% Subscription operations
ok = esdb_gater_api:save_subscription(my_store, stream, StreamId, Name, 0, self()).

%% Snapshot operations
ok = esdb_gater_api:record_snapshot(my_store, SourceUuid, StreamUuid, Version, Record).
{ok, Snap} = esdb_gater_api:read_snapshot(my_store, SourceUuid, StreamUuid, Version).

See erl-esdb-gater for complete API documentation.

License

Apache-2.0