Temporal Queries

View Source

Temporal queries enable point-in-time reconstruction of aggregate state and time-range analytics. This guide covers the server-side implementation, storage mechanics, and cluster behavior.

Overview

The erl_esdb_temporal module provides three core operations:

FunctionPurpose
read_until/3,4Read events up to a timestamp
read_range/4,5Read events within a time range
version_at/3Get stream version at a timestamp

Architecture

Temporal Query Flow

How Temporal Filtering Works

Timestamp Storage

Every event stored in erl-esdb includes an epoch_us field - microseconds since Unix epoch:

#event{
    event_id = <<"evt-123">>,
    stream_id = <<"orders-456">>,
    version = 5,
    epoch_us = 1735689600000000,  %% Jan 1, 2025 00:00:00 UTC
    event_type = <<"OrderPlaced">>,
    data = #{...},
    metadata = #{...}
}

This timestamp is set at append time using erlang:system_time(microsecond).

Query Execution

Temporal queries follow this execution path:

  1. Stream Existence Check: Verify stream exists in Khepri
  2. Event Retrieval: Read all events from the stream
  3. Timestamp Filtering: Filter events by epoch_us field
  4. Options Application: Apply direction and limit options
%% Internal filtering for read_until
filter_events_until(Events, Timestamp) ->
    [E || E <- Events, E#event.epoch_us =< Timestamp].

%% Internal filtering for read_range
filter_events_range(Events, FromTimestamp, ToTimestamp) ->
    [E || E <- Events,
          E#event.epoch_us >= FromTimestamp,
          E#event.epoch_us =< ToTimestamp].

Khepri Storage Path

Events are stored at:

[streams, StreamId, PaddedVersion] -> #event{}

The 12-character zero-padded version enables lexicographic ordering:

[streams, <<"orders-123">>, <<"000000000000">>] -> Event v0
[streams, <<"orders-123">>, <<"000000000001">>] -> Event v1
[streams, <<"orders-123">>, <<"000000000002">>] -> Event v2

Cluster Behavior

Consistency Guarantees

Temporal queries inherit Khepri/Ra's consistency model:

AspectBehavior
Read ConsistencyStrongly consistent (reads go through Raft leader)
Cross-NodeSame results on any cluster node
Partition ToleranceQueries fail if no quorum available

Leader Routing

All reads are routed to the Ra leader:

Client Request
     
     

 Any Node    
 (Follower)  

        Forward to leader
       

 Ra Leader   
 (Reads)     

        Raft log consistency
       
  Query Result

Performance Considerations

Current Implementation

The current implementation reads all events, then filters in memory:

read_all_events(StoreId, StreamId) ->
    Version = erl_esdb_streams:get_version(StoreId, StreamId),
    case Version of
        ?NO_STREAM -> {ok, []};
        _ -> erl_esdb_streams:read(StoreId, StreamId, 0, Version + 1, forward)
    end.

Implications:

  • O(n) memory usage where n = total events in stream
  • Suitable for streams with < 10,000 events
  • For larger streams, consider snapshots + temporal queries

Optimization Opportunities

Future versions could add:

  1. Khepri Timestamp Index: Secondary index on epoch_us
  2. Binary Search: If events are sorted by timestamp (they are by version, usually correlated)
  3. Streaming API: Process events in batches to reduce memory

Use Cases

1. Point-in-Time State Reconstruction

%% Reconstruct order state as it was on Dec 31, 2024
Timestamp = 1735603200000000,  %% Dec 31, 2024 00:00:00 UTC
{ok, Events} = erl_esdb_temporal:read_until(my_store, <<"order-123">>, Timestamp),
State = lists:foldl(fun apply_event/2, initial_state(), Events).

2. Audit Queries

%% What was the account balance at end of fiscal year?
FiscalYearEnd = 1735689599999999,  %% Dec 31, 2024 23:59:59.999999 UTC
{ok, Events} = erl_esdb_temporal:read_until(my_store, <<"account-456">>, FiscalYearEnd),
Balance = calculate_balance(Events).

3. Time-Range Analytics

%% Analyze orders from Q4 2024
Q4Start = 1727740800000000,  %% Oct 1, 2024
Q4End = 1735689599999999,    %% Dec 31, 2024
{ok, Events} = erl_esdb_temporal:read_range(my_store, <<"orders-*">>, Q4Start, Q4End),
analyze_quarterly_orders(Events).

4. Version Discovery

%% What version should I replay to for a snapshot at timestamp T?
{ok, Version} = erl_esdb_temporal:version_at(my_store, <<"user-789">>, Timestamp),
%% Now load snapshot at Version, or replay events 0..Version

Options

Both read_until/4 and read_range/5 accept options:

-type opts() :: #{
    direction => forward | backward,  %% Event order (default: forward)
    limit => pos_integer()             %% Max events to return
}.

Direction: Controls the order of returned events

  • forward: Oldest first (ascending by version)
  • backward: Newest first (descending by version)

Limit: Truncates result after applying direction

Telemetry

Temporal queries emit telemetry events:

%% Event: [erl_esdb, temporal, read_until]
%% Measurements: #{duration => integer(), event_count => integer()}
%% Metadata: #{store_id => atom(), stream_id => binary(), timestamp => integer()}

%% Event: [erl_esdb, temporal, read_range]
%% Measurements: #{duration => integer(), event_count => integer()}
%% Metadata: #{store_id => atom(), stream_id => binary(), timestamp => {From, To}}

Error Handling

ErrorCauseResolution
{error, {stream_not_found, StreamId}}Stream does not existVerify stream ID
{error, timeout}Khepri/Ra timeoutCheck cluster health
{error, no_quorum}Ra cluster partitionedWait for partition heal

Best Practices

  1. Use Snapshots: For frequently queried historical points, save snapshots
  2. Limit Result Size: Use the limit option for large time ranges
  3. Monitor Duration: Track telemetry for slow queries
  4. Consider Stream Size: Temporal queries load full streams; partition large streams

See Also