reckon-gater

View Source

Hex.pm Hexdocs.pm Buy Me A Coffee

Gateway for distributed access to reckon-db event stores.

Gateway Architecture

Overview

reckon-gater is an Erlang gateway service providing:

  • Distributed Worker Registry: pg-based registry for cluster-wide worker discovery
  • Load Balancing: Round-robin with exponential backoff retry
  • Shared Type Definitions: Common records for events, snapshots, and subscriptions
  • Capability-Based Security: UCAN-inspired tokens for decentralized authorization
  • PubSub Channels: 10 dedicated channels with priority-based delivery
  • HMAC Security: Message signing for critical channels
  • Telemetry: BEAM telemetry with optional OpenTelemetry exporters

Installation

Add to your rebar.config:

{deps, [
    {reckon_gater, "1.0.0"}
]}.

Pure Erlang implementation - works everywhere, no native dependencies.

Quick Start

%% Start the application (typically started by reckon-db)
application:ensure_all_started(reckon_gater).

%% Append events to a stream
Events = [#{type => <<"user_created">>, data => #{name => <<"Alice">>}}],
{ok, Version} = reckon_gater_api:append_events(my_store, <<"users-123">>, Events).

%% Read events from a stream
{ok, EventList} = reckon_gater_api:stream_forward(my_store, <<"users-123">>, 0, 100).

%% Subscribe to PubSub channel
ok = reckon_gater_channel_server:subscribe(reckon_gater_channel_events, <<"user.*">>, self()).

%% Receive channel messages
receive
    {channel_message, reckon_gater_channel_events, _Topic, Event} ->
        handle_event(Event)
end.

API Reference

Stream Operations

%% Append events to a stream
reckon_gater_api:append_events(StoreId, StreamId, Events) ->
    {ok, NewVersion} | {error, term()}.
reckon_gater_api:append_events(StoreId, StreamId, ExpectedVersion, Events) ->
    {ok, NewVersion} | {error, term()}.

%% Read events from a stream
reckon_gater_api:get_events(StoreId, StreamId, StartVersion, Count, Direction) ->
    {ok, [Event]} | {error, term()}.
reckon_gater_api:stream_forward(StoreId, StreamId, StartVersion, Count) ->
    {ok, [Event]} | {error, term()}.
reckon_gater_api:stream_backward(StoreId, StreamId, StartVersion, Count) ->
    {ok, [Event]} | {error, term()}.

%% Stream metadata
reckon_gater_api:get_version(StoreId, StreamId) -> {ok, Version} | {error, term()}.
reckon_gater_api:stream_exists(StoreId, StreamId) -> boolean().
reckon_gater_api:get_streams(StoreId) -> {ok, [StreamId]} | {error, term()}.

Subscription Operations

%% Create a subscription
reckon_gater_api:save_subscription(StoreId, Type, Selector, Name, StartFrom, Subscriber) ->
    ok | {error, term()}.

%% Remove a subscription
reckon_gater_api:remove_subscription(StoreId, Type, Selector, Name) ->
    ok | {error, term()}.

%% Acknowledge event processing
reckon_gater_api:ack_event(StoreId, StreamId, SubscriptionName, EventNumber) ->
    ok | {error, term()}.

%% List subscriptions
reckon_gater_api:get_subscriptions(StoreId) -> {ok, [Subscription]} | {error, term()}.

Snapshot Operations

%% Record a snapshot
reckon_gater_api:record_snapshot(StoreId, SourceUuid, StreamUuid, Version, Record) ->
    ok | {error, term()}.

%% Read a snapshot
reckon_gater_api:read_snapshot(StoreId, SourceUuid, StreamUuid, Version) ->
    {ok, Snapshot} | {error, term()}.

%% Delete a snapshot
reckon_gater_api:delete_snapshot(StoreId, SourceUuid, StreamUuid, Version) ->
    ok | {error, term()}.

%% List snapshots
reckon_gater_api:list_snapshots(StoreId, SourceUuid, StreamUuid) ->
    {ok, [Snapshot]} | {error, term()}.

Health

reckon_gater_api:health() -> healthy | {degraded, Reason} | {unhealthy, Reason}.
reckon_gater_api:quick_health_check(StoreId) -> ok | {error, term()}.

Temporal Queries

Query events by timestamp for point-in-time reconstruction. See Temporal Queries Guide.

%% Read events up to a timestamp
reckon_gater_api:read_until(StoreId, StreamId, Timestamp) ->
    {ok, [Event]} | {error, term()}.
reckon_gater_api:read_until(StoreId, StreamId, Timestamp, Opts) ->
    {ok, [Event]} | {error, term()}.

%% Read events in a time range
reckon_gater_api:read_range(StoreId, StreamId, FromTs, ToTs) ->
    {ok, [Event]} | {error, term()}.

%% Get stream version at a specific timestamp
reckon_gater_api:version_at(StoreId, StreamId, Timestamp) ->
    {ok, Version} | {error, term()}.

Scavenging

Remove old events beyond retention, optionally archive first. See Scavenging Guide.

%% Scavenge a stream (delete old events)
reckon_gater_api:scavenge(StoreId, StreamId, Opts) ->
    {ok, Result} | {error, term()}.

%% Scavenge streams matching a pattern
reckon_gater_api:scavenge_matching(StoreId, Pattern, Opts) ->
    {ok, [Result]} | {error, term()}.

%% Preview what would be deleted (dry run)
reckon_gater_api:scavenge_dry_run(StoreId, StreamId, Opts) ->
    {ok, Preview} | {error, term()}.

Causation Tracking

Track event lineage for debugging and auditing. See Causation Guide.

Causation Graph

%% Get events caused by an event
reckon_gater_api:get_effects(StoreId, EventId) ->
    {ok, [Event]} | {error, term()}.

%% Get the event that caused this one
reckon_gater_api:get_cause(StoreId, EventId) ->
    {ok, Event} | {error, not_found}.

%% Get full causation chain (root to this event)
reckon_gater_api:get_causation_chain(StoreId, EventId) ->
    {ok, [Event]} | {error, term()}.

%% Get all events with the same correlation ID
reckon_gater_api:get_correlated(StoreId, CorrelationId) ->
    {ok, [Event]} | {error, term()}.

%% Build causation graph for visualization
reckon_gater_api:build_causation_graph(StoreId, EventId) ->
    {ok, Graph} | {error, term()}.

Schema Operations

Schema registry with automatic upcasting. See Schema Evolution Guide.

Schema Upcasting

%% Register a schema
reckon_gater_api:register_schema(StoreId, EventType, Schema) -> ok.

%% Get schema for an event type
reckon_gater_api:get_schema(StoreId, EventType) ->
    {ok, Schema} | {error, not_found}.

%% List all schemas
reckon_gater_api:list_schemas(StoreId) -> {ok, [SchemaInfo]}.

%% Upcast events to current schema version
reckon_gater_api:upcast_events(StoreId, Events) ->
    {ok, UpcastedEvents} | {error, term()}.

%% Unregister a schema
reckon_gater_api:unregister_schema(StoreId, EventType) -> ok.

Memory Pressure

Adaptive behavior based on system memory. See Memory Pressure Guide.

%% Get current memory pressure level
reckon_gater_api:get_memory_level(StoreId) ->
    {ok, normal | elevated | critical}.

%% Get detailed memory statistics
reckon_gater_api:get_memory_stats(StoreId) ->
    {ok, #{used := bytes(), total := bytes(), level := atom()}}.

Create derived streams from source streams. See Stream Links Guide.

Stream Links

%% Create a new link (filter + transform)
reckon_gater_api:create_link(StoreId, LinkSpec) -> ok.

%% Delete a link
reckon_gater_api:delete_link(StoreId, LinkName) -> ok.

%% Get link by name
reckon_gater_api:get_link(StoreId, LinkName) ->
    {ok, LinkInfo} | {error, not_found}.

%% List all links
reckon_gater_api:list_links(StoreId) -> {ok, [LinkInfo]}.

%% Start/stop a link
reckon_gater_api:start_link(StoreId, LinkName) -> ok.
reckon_gater_api:stop_link(StoreId, LinkName) -> ok.

%% Get detailed link info
reckon_gater_api:link_info(StoreId, LinkName) ->
    {ok, #{status := atom(), events_processed := integer()}}.

Channels

%% Subscribe to a topic
reckon_gater_channel_server:subscribe(ChannelName, Topic, Pid) -> ok.

%% Subscribe with capability token (for authorization)
reckon_gater_channel_server:subscribe(ChannelName, Topic, Pid, CapabilityToken) ->
    ok | {error, {unauthorized, Reason}}.

%% Unsubscribe from a topic
reckon_gater_channel_server:unsubscribe(ChannelName, Topic, Pid) -> ok.

%% Publish a message
reckon_gater_channel_server:publish(ChannelName, Topic, Message) ->
    ok | {error, rate_limited | signature_required | invalid_signature}.

%% Publish with capability token (for authorization)
reckon_gater_channel_server:publish(ChannelName, Topic, Message, CapabilityToken) ->
    ok | {error, {unauthorized, Reason}}.

Security

%% Sign a message with default secret
reckon_gater_pubsub_security:sign(Message) -> SignedMessage.

%% Sign with custom secret
reckon_gater_pubsub_security:sign(Message, Secret) -> SignedMessage.

%% Verify a signed message
reckon_gater_pubsub_security:verify(SignedMessage) -> ok | {error, Reason}.

%% Set the default secret
reckon_gater_pubsub_security:set_secret(Secret) -> ok.

Retry Configuration

%% Create custom retry config
Config = reckon_gater_retry:new_config(
    100,     %% base_delay_ms
    5000,    %% max_delay_ms
    5        %% max_attempts
),

%% Execute with custom retry
reckon_gater_api:execute(my_store, Fun, Config).

Channels

PubSub Channels

The gateway provides 10 dedicated PubSub channels:

ChannelPriorityRate LimitHMACPurpose
reckon_gater_channel_alertscriticalunlimitedrequiredCritical system alerts
reckon_gater_channel_securitycriticalunlimitedrequiredSecurity events
reckon_gater_channel_eventshighunlimitedoptionalBusiness events
reckon_gater_channel_healthhigh100/secoptionalHealth checks
reckon_gater_channel_systemnormalunlimitedoptionalSystem notifications
reckon_gater_channel_metricsnormal10000/secoptionalPerformance metrics
reckon_gater_channel_auditnormalunlimitedoptionalAudit trail
reckon_gater_channel_lifecyclenormalunlimitedoptionalLifecycle events
reckon_gater_channel_logginglow1000/secoptionalLog messages
reckon_gater_channel_diagnosticslow100/secoptionalDiagnostic info

Channel Priorities

  • critical: Immediate delivery, no rate limiting, HMAC required
  • high: Priority delivery, minimal rate limiting
  • normal: Standard delivery
  • low: Background delivery, may be rate limited

Architecture

Supervision Tree

Supervision Tree

Worker Registry Flow

Worker Registry Flow

Channel Message Flow

Channel Message Flow

Configuration

%% sys.config
[{reckon_gater, [
    %% Cluster configuration
    {cluster, [
        {port, 45893},
        {multicast_addr, {239, 255, 0, 2}}
    ]},

    %% Retry defaults
    {retry, [
        {base_delay_ms, 100},
        {max_delay_ms, 30000},
        {max_attempts, 10}
    ]},

    %% Channel configuration
    {channels, [
        {reckon_gater_channel_events, [
            {priority, high}
        ]},
        {reckon_gater_channel_metrics, [
            {max_rate, 10000}
        ]}
    ]},

    %% Security
    {security, [
        {hmac_secret, <<"your_secret_here">>},
        {message_ttl_seconds, 300}
    ]},

    %% Telemetry
    {telemetry_handlers, [logger]}
]}].

Telemetry Events

EventMeasurementsMetadata
[reckon_gater, worker, registered]system_timestore_id, node, pid
[reckon_gater, worker, unregistered]system_timestore_id, pid
[reckon_gater, worker, lookup]durationstore_id
[reckon_gater, request, start]system_timestore_id, request_type
[reckon_gater, request, stop]durationstore_id, request_type, result
[reckon_gater, request, error]durationstore_id, request_type, reason
[reckon_gater, retry, attempt]delay_ms, attemptstore_id, reason
[reckon_gater, retry, exhausted]total_attemptsstore_id, reason
[reckon_gater, cluster, node, up]system_timenode, member_count
[reckon_gater, cluster, node, down]system_timenode, member_count
[reckon_gater, channel, broadcast]recipient_countchannel, topic

Attaching Handlers

%% Attach default logger handler
ok = reckon_gater_telemetry:attach_default_handler().

%% Attach custom handler
Handler = fun(Event, Measurements, Meta, Config) ->
    %% Your custom handling
    ok
end,
ok = reckon_gater_telemetry:attach(my_handler, Handler, #{}).

%% Detach handler
ok = reckon_gater_telemetry:detach(my_handler).

Building

rebar3 compile         # Compile
rebar3 eunit           # Unit tests (44 tests)
rebar3 ct              # Integration tests (8 tests)
rebar3 dialyzer        # Type checking

Testing

Test counts:

  • Unit tests: 44 tests (retry, security, telemetry)
  • Integration tests: 8 tests (channel system)
  • End-to-end tests: 24 tests (with reckon-db, run from reckon-db)
rebar3 eunit                                    # All unit tests
rebar3 ct --suite=esdb_channel_SUITE            # Channel tests

Run e2e tests from reckon-db:

cd /path/to/reckon-db
rebar3 ct --suite=test/e2e/reckon_gater_e2e_SUITE

Integration with reckon-db

reckon-gater is designed to work with reckon-db to provide load-balanced, distributed access to event stores.

Automatic Worker Registration

When both packages are deployed on the same nodes:

  1. reckon-db gateway workers automatically register with reckon-gater
  2. No manual registration is required
  3. Worker cleanup is automatic when nodes leave or workers crash

Automatic Worker Registration

Accessing the Event Store

Use the gateway API to access reckon-db with automatic load balancing and retry:

%% Stream operations
{ok, Version} = reckon_gater_api:append_events(my_store, StreamId, Events).
{ok, Version} = reckon_gater_api:append_events(my_store, StreamId, ExpectedVersion, Events).
{ok, Events} = reckon_gater_api:stream_forward(my_store, StreamId, 0, 100).
{ok, Events} = reckon_gater_api:stream_backward(my_store, StreamId, 100, 50).
{ok, Version} = reckon_gater_api:get_version(my_store, StreamId).
true = reckon_gater_api:stream_exists(my_store, StreamId).

%% Subscription operations
ok = reckon_gater_api:save_subscription(my_store, stream, StreamId, <<"my_sub">>, 0, self()).
ok = reckon_gater_api:remove_subscription(my_store, stream, StreamId, <<"my_sub">>).
ok = reckon_gater_api:ack_event(my_store, StreamId, <<"my_sub">>, EventNumber).
{ok, Subs} = reckon_gater_api:get_subscriptions(my_store).

%% Snapshot operations
ok = reckon_gater_api:record_snapshot(my_store, SourceUuid, StreamUuid, Version, Record).
{ok, Snap} = reckon_gater_api:read_snapshot(my_store, SourceUuid, StreamUuid, Version).
ok = reckon_gater_api:delete_snapshot(my_store, SourceUuid, StreamUuid, Version).
{ok, Snaps} = reckon_gater_api:list_snapshots(my_store, SourceUuid, StreamUuid).

%% Health check
healthy = reckon_gater_api:health().
ok = reckon_gater_api:quick_health_check(my_store).

Deployment

reckon-db includes reckon-gater as a dependency. Starting reckon-db automatically starts the gateway:

%% Start reckon-db (includes gater)
application:ensure_all_started(reckon_db).

%% Gateway workers auto-register with the pg-based registry
%% Use the gater API for all operations
{ok, Version} = reckon_gater_api:append_events(my_store, StreamId, Events).

In a multi-node cluster, each node runs reckon-db with its gateway worker. The pg-based registry provides:

  • Cluster-wide worker discovery via pg:get_members/2
  • Eventual consistency (workers visible across all nodes)
  • Automatic cleanup on node failure (pg membership)
  • Load balancing with round-robin selection
  • Exponential backoff retry on failures

Shared Types

reckon-gater provides shared type definitions used across the ecosystem. Include them in your modules:

-include_lib("reckon_gater/include/esdb_gater_types.hrl").

Records

RecordPurpose
#event{}Event with type, data (Erlang term), and metadata
#snapshot{}Aggregate snapshot at a specific version
#subscription{}Subscription state and configuration
#append_result{}Result of an append operation

Version Constants

ConstantValuePurpose
?NO_STREAM-1Stream must not exist (first write)
?ANY_VERSION-2No version check, always append
?STREAM_EXISTS-4Stream must exist

See the Shared Types Guide for detailed usage.

Ecosystem

ProjectDescription
reckon-dbCore event store built on Khepri/Ra
evoqCQRS/Event Sourcing framework
reckon-evoqAdapter connecting evoq to reckon-db

License

Apache-2.0