mem_evoq_store (mem_evoq v0.1.1)

View Source

Per-store gen_server holding events, snapshots, subscribers, and (optionally) integrity state for a single in-memory event store.

The state shape mirrors what reckon-db keeps in Khepri:

  • streams — map of StreamId to ordered list of #event{}
  • snapshots — map of StreamId to map of Version to #snapshot{}
  • subscribers — map of subscription key to subscriber metadata
  • integritydisabled or {enabled, Key, ChainStarts}

No persistence. Process restart loses state. That is the intended semantic — mem-evoq exists for tests, demos, and as a reference implementation of the evoq_event_store adapter behaviour. For production use, pair evoq with reckon-evoq + reckon-db.

Streams are stored in append order (events 0..N-1, oldest first). This costs O(N) on append because lists are cons-prepended and reversed once on the wire, but mem-evoq is for tests where N is small. Premature optimisation here would obscure the reference implementation aspect.

Summary

Types

enabled_integrity/0

-type enabled_integrity() :: #{key := binary(), chain_start := #{binary() => non_neg_integer()}}.

event/0

-type event() ::
          #event{event_id :: binary(),
                 event_type :: binary(),
                 stream_id :: binary(),
                 version :: non_neg_integer(),
                 data :: map() | binary(),
                 metadata :: map(),
                 tags :: [binary()] | undefined,
                 timestamp :: integer(),
                 epoch_us :: integer(),
                 data_content_type :: binary(),
                 metadata_content_type :: binary(),
                 prev_event_hash :: binary() | undefined,
                 mac :: {KeyId :: non_neg_integer(), MacBytes :: binary()} | undefined,
                 signature :: binary() | undefined}.

filter/0

-type filter() ::
          all |
          {by_stream, binary()} |
          {by_event_type, binary()} |
          {by_event_pattern, map()} |
          {by_tags, [binary()], any | all}.

integrity_ctx/0

-type integrity_ctx() :: disabled | {enabled, Key :: binary(), ChainStart :: non_neg_integer()}.

snapshot/0

-type snapshot() ::
          #snapshot{stream_id :: binary(),
                    version :: non_neg_integer(),
                    data :: map() | binary(),
                    metadata :: map(),
                    timestamp :: integer(),
                    anchor_hash :: binary() | undefined,
                    mac :: {KeyId :: non_neg_integer(), MacBytes :: binary()} | undefined}.

Functions

code_change(OldVsn, State, Extra)

handle_call(Req, From, State)

handle_cast(Msg, State)

handle_info(Info, State)

init(_)

start_link(StoreId, Opts)

-spec start_link(atom(), map()) -> {ok, pid()} | {error, term()}.

terminate(Reason, State)