reckon_db_schema (reckon_db v1.6.0)

View Source

Schema registry and upcasting for reckon-db

Provides schema versioning and automatic event transformation: - Register schemas for event types with version numbers - Define upcast functions to transform old versions to new - Auto-upcast events when reading from streams

Event schema evolution strategies: - Weak schema: No validation, just track versions - Strong schema: Validate against JSON Schema or custom validator - Tolerant reader: Accept old versions, upcast on demand

Usage:

  %% Register schema
  reckon_db_schema:register(my_store, <<"OrderPlaced">>, #{
      version => 2,
      upcast_from => #{
          1 => fun(V1Data) -> V1Data#{new_field => default} end
      }
  }).
 
  %% Read with upcasting
  {ok, Events} = reckon_db_streams:read(my_store, stream, 0, 100, forward),
  UpcastedEvents = reckon_db_schema:upcast(my_store, Events).

Summary

Functions

Get a schema by event type.

Get current version for an event type.

List all registered schemas.

Register a schema for an event type.

Unregister a schema.

Upcast a list of events to their current schema versions.

Upcast a single event to current schema version.

Validate an event against its registered schema.

Types

event/0

-type event() ::
          #event{event_id :: binary(),
                 event_type :: binary(),
                 stream_id :: binary(),
                 version :: non_neg_integer(),
                 data :: map() | binary(),
                 metadata :: map(),
                 tags :: [binary()] | undefined,
                 timestamp :: integer(),
                 epoch_us :: integer(),
                 data_content_type :: binary(),
                 metadata_content_type :: binary()}.

schema/0

-type schema() ::
          #{event_type := binary(),
            version := version(),
            upcast_from => #{version() => upcast_fun()},
            validator => validator_fun(),
            description => binary()}.

schema_info/0

-type schema_info() :: #{event_type := binary(), version := version(), registered_at := integer()}.

upcast_fun/0

-type upcast_fun() :: fun((map()) -> map()).

validator_fun/0

-type validator_fun() :: fun((map()) -> ok | {error, term()}).

version/0

-type version() :: pos_integer().

Functions

get(StoreId, EventType)

-spec get(atom(), binary()) -> {ok, schema()} | {error, not_found}.

Get a schema by event type.

get_version(StoreId, EventType)

-spec get_version(atom(), binary()) -> {ok, version()} | {error, not_found}.

Get current version for an event type.

list(StoreId)

-spec list(atom()) -> {ok, [schema_info()]} | {error, term()}.

List all registered schemas.

register(StoreId, EventType, Schema)

-spec register(atom(), binary(), schema()) -> ok | {error, term()}.

Register a schema for an event type.

Options: - version: Schema version (required, positive integer) - upcast_from: Map of OldVersion to UpcastFun for transformations - validator: Fun to validate event data (returns ok or error tuple) - description: Human-readable description

unregister(StoreId, EventType)

-spec unregister(atom(), binary()) -> ok | {error, term()}.

Unregister a schema.

upcast(StoreId, Events)

-spec upcast(atom(), [event()]) -> [event()].

Upcast a list of events to their current schema versions.

Events without registered schemas are returned unchanged. Events already at current version are returned unchanged.

upcast_event(StoreId, Event)

-spec upcast_event(atom(), event()) -> event().

Upcast a single event to current schema version.

validate(StoreId, Event)

-spec validate(atom(), event()) -> ok | {error, term()}.

Validate an event against its registered schema.