reckon_db_links (reckon_db v1.4.4)

View Source

Stream linking and simple projections for reckon-db

Provides derived streams from source streams: - Filter events based on predicates - Transform event data - Create materialized streams for specialized queries

Links are live - new events are automatically propagated. Link streams can be subscribed to like regular streams.

Usage:

  %% Create a link for high-value orders
  reckon_db_links:create(my_store, #{
      name => <<"high-value-orders">>,
      source => #{type => stream_pattern, pattern => <<"orders-*">>},
      filter => fun(E) -> maps:get(total, E#event.data, 0) > 1000 end,
      transform => fun(E) -> E#event{data = E#event.data#{flagged => true}} end
  }).
 
  %% Subscribe to linked stream
  reckon_db_subscriptions:subscribe(my_store, stream, <<"$link:high-value-orders">>, ...).

Summary

Functions

Create a new link.

Delete a link.

Get a link by name.

Get detailed link info.

List all links.

Start processing a link.

Stop processing a link.

Types

event/0

-type event() ::
          #event{event_id :: binary(),
                 event_type :: binary(),
                 stream_id :: binary(),
                 version :: non_neg_integer(),
                 data :: map() | binary(),
                 metadata :: map(),
                 tags :: [binary()] | undefined,
                 timestamp :: integer(),
                 epoch_us :: integer(),
                 data_content_type :: binary(),
                 metadata_content_type :: binary()}.

filter_fun/0

-type filter_fun() :: fun((event()) -> boolean()).

source_spec/0

-type source_spec() ::
          #{type := stream | stream_pattern | all, stream_id => binary(), pattern => binary()}.

transform_fun/0

-type transform_fun() :: fun((event()) -> event()).

Functions

create(StoreId, Spec)

-spec create(atom(), link_spec()) -> ok | {error, term()}.

Create a new link.

Options: - name: Link name (required, will create stream with $link prefix) - source: Source specification (stream, stream_pattern, or all) - filter: Predicate function to filter events - transform: Function to transform events - backfill: Process existing events (default: false)

delete(StoreId, Name)

-spec delete(atom(), binary()) -> ok | {error, term()}.

Delete a link.

get(StoreId, Name)

-spec get(atom(), binary()) -> {ok, link_info()} | {error, not_found}.

Get a link by name.

info(StoreId, Name)

-spec info(atom(), binary()) -> {ok, map()} | {error, not_found}.

Get detailed link info.

list(StoreId)

-spec list(atom()) -> {ok, [link_info()]} | {error, term()}.

List all links.

start(StoreId, Name)

-spec start(atom(), binary()) -> ok | {error, term()}.

Start processing a link.

This will: 1. Subscribe to source stream(s) 2. Optionally backfill existing events 3. Apply filter and transform to each event 4. Write matching events to the link stream

stop(StoreId, Name)

-spec stop(atom(), binary()) -> ok | {error, term()}.

Stop processing a link.