Storage Internals
View SourceThis guide covers the internal storage architecture of erl-esdb, including Khepri path structures, data organization, and cluster replication behavior.
Overview
erl-esdb uses Khepri as its storage layer. Khepri is a tree-like key-value store built on Ra (Raft consensus), providing:
- Strong consistency across cluster nodes
- Automatic replication
- Tree-structured data organization
- Efficient prefix queries
Architecture
Khepri Path Structure
All data is organized under hierarchical paths:
[root]
├── [streams]
│ └── [StreamId]
│ └── [PaddedVersion] -> #event{}
├── [snapshots]
│ └── [StreamId]
│ └── [PaddedVersion] -> #snapshot{}
├── [subscriptions]
│ └── [SubscriptionName] -> #subscription{}
├── [schemas]
│ └── [StoreId]
│ └── [EventType] -> schema_map()
├── [links]
│ └── [StoreId]
│ └── [LinkName] -> #link{}
└── [metadata]
└── [Key] -> ValueStreams Storage
Path Format
?STREAMS_PATH ++ [StreamId, PaddedVersion]
%% Example: [streams, <<"orders-123">>, <<"000000000042">>]Version Padding
Versions are zero-padded to 12 characters for lexicographic ordering:
-define(VERSION_PADDING, 12).
pad_version(Version, Length) ->
VersionStr = integer_to_list(Version),
Padding = Length - length(VersionStr),
PaddedStr = lists:duplicate(Padding, $0) ++ VersionStr,
list_to_binary(PaddedStr).
%% Examples:
%% 0 -> <<"000000000000">>
%% 42 -> <<"000000000042">>
%% 999999999999 -> <<"999999999999">>This supports up to 999,999,999,999 events per stream (~317 years at 100 events/sec).
Event Record
-record(event, {
event_id :: binary(), %% UUID
stream_id :: binary(), %% Stream identifier
version :: non_neg_integer(), %% 0-indexed position
event_type :: binary(), %% Event type name
data :: map(), %% Event payload
metadata :: map(), %% Event metadata
epoch_us :: integer() %% Timestamp (microseconds since epoch)
}).Reading Events
Events are read using Khepri path queries:
%% Read specific version
khepri:get(StoreId, [streams, StreamId, PaddedVersion]).
%% Read range of versions
Pattern = [streams, StreamId, ?KHEPRI_WILDCARD_STAR],
khepri:get_many(StoreId, Pattern).Snapshots Storage
Path Format
?SNAPSHOTS_PATH ++ [StreamId, PaddedVersion]
%% Example: [snapshots, <<"orders-123">>, <<"000000000100">>]Snapshot Record
-record(snapshot, {
stream_id :: binary(),
version :: non_neg_integer(), %% Event version this snapshot represents
state :: term(), %% Serialized aggregate state
metadata :: map(),
created_at :: integer() %% Timestamp
}).Latest Snapshot Query
%% Get all snapshots for a stream, sorted by version descending
Pattern = [snapshots, StreamId, ?KHEPRI_WILDCARD_STAR],
{ok, Snapshots} = khepri:get_many(StoreId, Pattern),
%% Sort by version descending to get latest firstSubscriptions Storage
Path Format
?SUBSCRIPTIONS_PATH ++ [SubscriptionName]
%% Example: [subscriptions, <<"my-projection">>]Subscription Record
-record(subscription, {
name :: binary(),
type :: stream | event_type | event_pattern | event_payload,
selector :: binary() | map(),
handler :: pid() | function(),
options :: map(),
created_at :: integer()
}).Schema Registry Storage
Path Format
?SCHEMAS_PATH ++ [StoreId, EventType]
%% Example: [schemas, my_store, <<"OrderPlaced">>]Schema Structure
#{
event_type => <<"OrderPlaced">>,
version => 3,
upcast_from => #{
1 => fun(Data) -> ... end,
2 => fun(Data) -> ... end
},
validator => fun(Data) -> ok | {error, Reason} end,
description => <<"Current order event schema">>,
registered_at => 1735689600000
}Links Storage
Path Format
?LINKS_PATH ++ [StoreId, LinkName]
%% Example: [links, my_store, <<"high-value-orders">>]Link Record
-record(link, {
name :: binary(),
source :: source_spec(),
filter :: fun((event()) -> boolean()) | undefined,
transform :: fun((event()) -> event()) | undefined,
backfill :: boolean(),
created_at :: integer(),
status :: running | stopped | error,
processed :: non_neg_integer(),
last_event :: binary() | undefined
}).Cluster Replication
Ra Consensus
All writes go through Raft consensus:
Write Request
│
▼
┌─────────┐
│ Leader │ ◄─── All writes go here
└────┬────┘
│
┌────┴────┬────────┐
│ │ │
▼ ▼ ▼
┌──────┐ ┌──────┐ ┌──────┐
│Follow│ │Follow│ │Follow│
│ 1 │ │ 2 │ │ 3 │
└──────┘ └──────┘ └──────┘Consistency Guarantees
| Operation | Guarantee |
|---|---|
| Write (append) | Strongly consistent (quorum) |
| Read | Strongly consistent (from leader) |
| Cross-stream | No transaction (best effort) |
Failover Behavior
- Leader fails
- Ra elects new leader (typically < 1 second)
- Writes resume on new leader
- In-flight writes may need retry
Storage Operations
Writing Events
%% Append event
PaddedVersion = pad_version(Version, ?VERSION_PADDING),
Path = [streams, StreamId, PaddedVersion],
khepri:put(StoreId, Path, Event).Reading Events
%% Read specific version
{ok, Event} = khepri:get(StoreId, [streams, StreamId, PaddedVersion]).
%% Read all events in stream
Pattern = [streams, StreamId, ?KHEPRI_WILDCARD_STAR],
{ok, EventsMap} = khepri:get_many(StoreId, Pattern).Deleting Events (Scavenging)
%% Delete individual event
khepri:delete(StoreId, [streams, StreamId, PaddedVersion]).Listing Streams
%% Get all stream IDs
Pattern = [streams, ?KHEPRI_WILDCARD_STAR],
{ok, StreamNodes} = khepri:get_many(StoreId, Pattern),
StreamIds = maps:keys(StreamNodes).Memory Considerations
Khepri In-Memory
Khepri keeps data in memory for fast access:
- All paths and values are in memory
- Raft log is also in memory (up to snapshot interval)
- Consider total event size when planning capacity
Memory Estimation
%% Rough estimate per event
EventMemory = byte_size(term_to_binary(Event)),
TotalEvents = 1000000,
ApproxMemory = EventMemory * TotalEvents * 1.5, %% 1.5x for overheadReducing Memory
- Scavenging: Remove old events
- Snapshots: Enable state recovery without all events
- Archival: Move to cold storage before scavenging
Disk Persistence
Ra Snapshots
Ra periodically snapshots the Raft log to disk:
%% Default location
DataDir = application:get_env(ra, data_dir, "ra_data"),
%% Store-specific subdirectory
StoreDir = filename:join(DataDir, atom_to_list(StoreId)).Snapshot Interval
Configure via Ra settings:
%% In sys.config
{ra, [
{segment_max_entries, 65536}, %% Entries per segment
{wal_max_size_bytes, 134217728} %% 128MB WAL size
]}Querying Patterns
Prefix Queries
%% All events for a stream
[streams, <<"orders-123">>, ?KHEPRI_WILDCARD_STAR]
%% All snapshots for a stream
[snapshots, <<"orders-123">>, ?KHEPRI_WILDCARD_STAR]Existence Checks
%% Check if stream exists
case khepri:exists(StoreId, [streams, StreamId]) of
true -> stream_exists;
false -> no_stream
end.Conditional Updates
%% Optimistic concurrency via expected version
case khepri:get(StoreId, [streams, StreamId, PaddedExpectedVersion]) of
{ok, _} ->
%% Version exists, write next
khepri:put(StoreId, [streams, StreamId, PaddedNextVersion], NewEvent);
{error, {khepri, node_not_found, _}} ->
{error, wrong_expected_version}
end.Performance Tips
1. Batch Writes
%% Write multiple events in single Ra command
Events = [Event1, Event2, Event3],
khepri:transaction(StoreId, fun() ->
lists:foreach(fun(E) ->
khepri:put([streams, StreamId, pad_version(E#event.version)], E)
end, Events)
end).2. Use Snapshots
%% Avoid replaying thousands of events
{ok, Snapshot} = load_latest_snapshot(StoreId, StreamId),
{ok, NewEvents} = read_events_since(StoreId, StreamId, Snapshot#snapshot.version),
State = apply_events(Snapshot#snapshot.state, NewEvents).3. Monitor Memory
%% Check Khepri memory usage
KhepriInfo = khepri:info(StoreId),
MemoryUsed = proplists:get_value(memory, KhepriInfo).Troubleshooting
Common Issues
| Issue | Cause | Resolution |
|---|---|---|
| Slow writes | No quorum | Check cluster health |
| High memory | Too many events | Enable scavenging |
| Stale reads | Reading during partition | Wait for partition heal |
Diagnostic Commands
%% Cluster status
ra:members(StoreId).
%% Leader info
khepri:get_leader(StoreId).
%% Store statistics
khepri:info(StoreId).See Also
- Temporal Queries - Time-based event retrieval
- Scavenging - Event lifecycle management
- Memory Pressure - Memory monitoring