reckon_db_aggregator (reckon_db v1.2.6)

View Source

Event aggregator for reckon-db

Aggregates events from an event stream using tagged rules. Supports special value tags for custom aggregation behavior:

Tagged value types: {sum, N} - Add N to current value (starts at 0) {overwrite, V} - Replace current value with V plain value - Replace current value (default behavior)

Summary

Functions

Aggregate events from a stream with optional snapshot

Finalize a tagged map by unwrapping all tagged values

Fold a list of events from left to right (chronological order)

Fold events with an initial state

Fold a list of events from right to left

Fold events from right with an initial state

Types

event/0

-type event() ::
          #event{event_id :: binary(),
                 event_type :: binary(),
                 stream_id :: binary(),
                 version :: non_neg_integer(),
                 data :: map() | binary(),
                 metadata :: map(),
                 tags :: [binary()] | undefined,
                 timestamp :: integer(),
                 epoch_us :: integer(),
                 data_content_type :: binary(),
                 metadata_content_type :: binary()}.

snapshot/0

-type snapshot() ::
          #snapshot{stream_id :: binary(),
                    version :: non_neg_integer(),
                    data :: map() | binary(),
                    metadata :: map(),
                    timestamp :: integer()}.

tagged_map/0

-type tagged_map() :: #{atom() | binary() => tagged_value()}.

tagged_value/0

-type tagged_value() :: {sum, number()} | {overwrite, term()} | term().

Functions

aggregate(Events, Snapshot, Opts)

-spec aggregate([event() | map()], snapshot() | undefined, map()) -> map().

Aggregate events from a stream with optional snapshot

This is a convenience function that: 1. Starts from a snapshot's data (if provided) or empty map 2. Applies events in order 3. Returns the finalized aggregate state

finalize(TaggedMap)

-spec finalize(tagged_map()) -> map().

Finalize a tagged map by unwrapping all tagged values

Converts {sum, N} to N and {overwrite, V} to V.

foldl(Events)

-spec foldl([event() | map()]) -> tagged_map().

Fold a list of events from left to right (chronological order)

Events should be sorted by version in ascending order. Returns a tagged map that can be finalized with finalize/1.

foldl(Events, InitialState)

-spec foldl([event() | map()], tagged_map()) -> tagged_map().

Fold events with an initial state

foldr(Events)

-spec foldr([event() | map()]) -> tagged_map().

Fold a list of events from right to left

Events should be sorted by version in ascending order. This will process them in reverse (newest first).

foldr(Events, InitialState)

-spec foldr([event() | map()], tagged_map()) -> tagged_map().

Fold events from right with an initial state