reckon_gater_api (reckon_gater v2.0.1)

View Source

Main API for reckon-gater

Provides the primary interface for accessing reckon-db event stores through the gateway with automatic load balancing and retry.

This module mirrors the ExESDBGater.API pattern from the original Elixir implementation, providing specific functions for each operation rather than a generic call interface.

Stream Operations

{ok, Version} = reckon_gater_api:append_events(my_store, StreamId, Events). {ok, Events} = reckon_gater_api:get_events(my_store, StreamId, 0, 100, forward). {ok, Version} = reckon_gater_api:get_version(my_store, StreamId).

Subscription Operations

ok = reckon_gater_api:save_subscription(my_store, by_stream, Selector, Name). ok = reckon_gater_api:remove_subscription(my_store, by_stream, Selector, Name).

Snapshot Operations

ok = reckon_gater_api:record_snapshot(my_store, SourceId, StreamId, Version, Data). {ok, Snapshot} = reckon_gater_api:read_snapshot(my_store, SourceId, StreamId, Version).

Summary

Functions

Acknowledge receipt of an event by a subscriber

Append events to a stream (auto-versioned)

Append events to a stream with expected version

Build a causation graph for visualization

Check Raft log consistency for a store

Create a new link

Delete a stream and all its events

Census of event types in the store with counts.

Get the full causation chain for an event

Get the event that caused another

Get all events with the same correlation ID

Get events caused by an event

Get a link by name

Get current memory pressure level

Get memory statistics

Get a schema by event type

Get the version of a schema

Get all streams in a store

Get a specific subscription by name

Get all subscriptions for a store

Get the current version of a stream

Get all registered workers for a store

Check if a store contains at least one event.

Get gateway health status

Get detailed info about a link

List all snapshots across all streams in a store.

List all links

List all schemas

List all subscriptions for a store with checkpoint positions.

List all managed stores in the cluster

Pure worker-selection policy behind select_worker/1.

Quick health check for a store

Read all events across all streams sorted by epoch_us (global ordering).

Read events by type using native Khepri filtering

Read events by tags (default: ANY match, batch_size 1000).

Read events by tags with options.

Read events in a time range with options

Read events up to a timestamp

Read events up to a timestamp with options

Register current process as a worker for a store

Register a specific process as a worker for a store

Scavenge a stream (delete old events)

Dry-run scavenge (preview what would be deleted)

Scavenge streams matching a pattern

Aggregate statistics for a store.

Stream events backward from a version

Stream events forward from a version

Detailed info for a single stream (timestamps, snapshot coverage).

Calculate lag for a specific subscription.

Unregister current process as a worker for a store

Unregister a specific process as a worker for a store

Upcast events to current schema version

Verify cluster consistency for a store

Verify membership consensus for a store

Get stream version at a specific timestamp

Types

worker_entry/0

-type worker_entry() ::
          #worker_entry{store_id :: atom(), node :: node(), pid :: pid(), registered_at :: integer()}.

Functions

ack_event(StoreId, SubscriptionName, SubscriberPid, Event)

-spec ack_event(atom(), binary(), pid(), map()) -> ok.

Acknowledge receipt of an event by a subscriber

append_events(StoreId, StreamId, Events)

-spec append_events(atom(), binary(), list()) -> {ok, integer()} | {error, term()}.

Append events to a stream (auto-versioned)

append_events(StoreId, StreamId, ExpectedVersion, Events)

-spec append_events(atom(), binary(), integer() | any, list()) ->
                       {ok, integer()} | {error, term()} | {error, {wrong_expected_version, integer()}}.

Append events to a stream with expected version

build_causation_graph(StoreId, Id)

-spec build_causation_graph(atom(), binary()) -> {ok, map()} | {error, term()}.

Build a causation graph for visualization

check_raft_log_consistency(StoreId)

-spec check_raft_log_consistency(atom()) -> {ok, map()} | {error, term()}.

Check Raft log consistency for a store

create_link(StoreId, LinkSpec)

-spec create_link(atom(), map()) -> ok.

Create a new link

delete_link(StoreId, LinkName)

-spec delete_link(atom(), binary()) -> ok.

Delete a link

delete_snapshot(StoreId, SourceUuid, StreamUuid, Version)

-spec delete_snapshot(atom(), binary(), binary(), non_neg_integer()) -> ok.

Delete a snapshot

delete_stream(StoreId, StreamId)

-spec delete_stream(atom(), binary()) -> ok | {error, term()}.

Delete a stream and all its events

event_type_summary(StoreId)

-spec event_type_summary(atom()) -> {ok, [map()]} | {error, term()}.

Census of event types in the store with counts.

get_causation_chain(StoreId, EventId)

-spec get_causation_chain(atom(), binary()) -> {ok, list()} | {error, term()}.

Get the full causation chain for an event

get_cause(StoreId, EventId)

-spec get_cause(atom(), binary()) -> {ok, map()} | {error, term()}.

Get the event that caused another

get_correlated(StoreId, CorrelationId)

-spec get_correlated(atom(), binary()) -> {ok, list()} | {error, term()}.

Get all events with the same correlation ID

get_effects(StoreId, EventId)

-spec get_effects(atom(), binary()) -> {ok, list()} | {error, term()}.

Get events caused by an event

get_events(StoreId, StreamId, StartVersion, Count, Direction)

-spec get_events(atom(), binary(), integer(), integer(), forward | backward) ->
                    {ok, list()} | {error, term()}.

Get events from a stream

get_link(StoreId, LinkName)

-spec get_link(atom(), binary()) -> {ok, map()} | {error, term()}.

Get a link by name

get_memory_level(StoreId)

-spec get_memory_level(atom()) -> {ok, atom()} | {error, term()}.

Get current memory pressure level

get_memory_stats(StoreId)

-spec get_memory_stats(atom()) -> {ok, map()} | {error, term()}.

Get memory statistics

get_schema(StoreId, EventType)

-spec get_schema(atom(), binary()) -> {ok, map()} | {error, term()}.

Get a schema by event type

get_schema_version(StoreId, EventType)

-spec get_schema_version(atom(), binary()) -> {ok, integer()} | {error, term()}.

Get the version of a schema

get_streams(StoreId)

-spec get_streams(atom()) -> {ok, list()} | {error, term()}.

Get all streams in a store

get_subscription(StoreId, SubscriptionName)

-spec get_subscription(atom(), binary()) -> {ok, map()} | {error, term()}.

Get a specific subscription by name

Returns the subscription details including the checkpoint.

get_subscriptions(StoreId)

-spec get_subscriptions(atom()) -> {ok, list()} | {error, term()}.

Get all subscriptions for a store

get_version(StoreId, StreamId)

-spec get_version(atom(), binary()) -> {ok, integer()} | {error, term()}.

Get the current version of a stream

get_workers(StoreId)

-spec get_workers(atom()) -> {ok, [worker_entry()]} | {error, term()}.

Get all registered workers for a store

has_events(StoreId)

-spec has_events(atom()) -> boolean().

Check if a store contains at least one event.

health()

-spec health() -> {ok, map()}.

Get gateway health status

list_all_snapshots(StoreId)

-spec list_all_snapshots(atom()) -> {ok, [map()]} | {error, term()}.

List all snapshots across all streams in a store.

list_links(StoreId)

-spec list_links(atom()) -> {ok, list()} | {error, term()}.

List all links

list_schemas(StoreId)

-spec list_schemas(atom()) -> {ok, list()} | {error, term()}.

List all schemas

list_snapshots(StoreId, SourceUuid, StreamUuid)

-spec list_snapshots(atom(), binary() | any, binary() | any) -> {ok, [map()]} | {error, term()}.

List snapshots

list_store_subscriptions(StoreId)

-spec list_store_subscriptions(atom()) -> {ok, [map()]} | {error, term()}.

List all subscriptions for a store with checkpoint positions.

list_stores()

-spec list_stores() -> {ok, list()} | {error, term()}.

List all managed stores in the cluster

pick_worker(Workers, Node, Index)

-spec pick_worker([worker_entry()], node(), non_neg_integer()) ->
                     {ok, worker_entry(), non_neg_integer()} | {error, no_workers}.

Pure worker-selection policy behind select_worker/1.

Prefers workers on the caller's own node when any exist; falls back to cluster-wide round-robin only when no local worker is present. Extracted so the policy can be exercised without a running pg scope.

Why local-first

The worker registry is pg-based, so a call to get_workers/1 returns PIDs from every BEAM-connected node. That's the right pool for stores that form a shared Raft cluster — any worker writes into the same Khepri state machine and reads are cluster-wide.

For stores that stay local per node (autojoin=false), a write routed to a remote worker persists in THAT node's private store and is invisible to the caller's local store. Preferring the caller's own node keeps each daemon's writes on its own disk. Falling back to the remote pool preserves the cluster-wide behaviour when the local worker isn't registered yet (boot race) or when the store is genuinely clustered and the caller has no local presence.

quick_health_check(StoreId)

-spec quick_health_check(atom()) -> {ok, map()} | {error, term()}.

Quick health check for a store

read_all_global(StoreId, Offset, BatchSize)

-spec read_all_global(atom(), non_neg_integer(), pos_integer()) -> {ok, list()} | {error, term()}.

Read all events across all streams sorted by epoch_us (global ordering).

This is used for catch-up subscriptions where a consumer needs to replay all historical events from a given offset.

Offset is the number of events to skip (not a version number). BatchSize controls how many events to return per call.

read_by_event_types(StoreId, EventTypes, BatchSize)

-spec read_by_event_types(atom(), [binary()], pos_integer()) -> {ok, list()} | {error, term()}.

Read events by type using native Khepri filtering

This uses the server-side read_by_event_types which performs efficient filtering at the database level rather than loading all events.

read_by_tags(StoreId, Tags)

-spec read_by_tags(atom(), [binary()]) -> {ok, list()} | {error, term()}.

Read events by tags (default: ANY match, batch_size 1000).

Queries events across all streams that have matching tags. By default, returns events matching ANY of the provided tags (union).

read_by_tags(StoreId, Tags, Opts)

-spec read_by_tags(atom(), [binary()], map()) -> {ok, list()} | {error, term()}.

Read events by tags with options.

Queries events across all streams that have matching tags. Tags are typically used for cross-stream querying in the process-centric model.

Options: - match: any (default) returns events matching ANY tag (union), all returns events matching ALL tags (intersection) - batch_size: Maximum events to return (default 1000)

read_range(StoreId, StreamId, FromTimestamp, ToTimestamp)

-spec read_range(atom(), binary(), integer(), integer()) -> {ok, list()} | {error, term()}.

Read events in a time range

read_range(StoreId, StreamId, FromTimestamp, ToTimestamp, Opts)

-spec read_range(atom(), binary(), integer(), integer(), map()) -> {ok, list()} | {error, term()}.

Read events in a time range with options

read_snapshot(StoreId, SourceUuid, StreamUuid, Version)

-spec read_snapshot(atom(), binary(), binary(), non_neg_integer()) -> {ok, map()} | {error, term()}.

Read a snapshot

read_until(StoreId, StreamId, Timestamp)

-spec read_until(atom(), binary(), integer()) -> {ok, list()} | {error, term()}.

Read events up to a timestamp

read_until(StoreId, StreamId, Timestamp, Opts)

-spec read_until(atom(), binary(), integer(), map()) -> {ok, list()} | {error, term()}.

Read events up to a timestamp with options

record_snapshot(StoreId, SourceUuid, StreamUuid, Version, SnapshotRecord)

-spec record_snapshot(atom(), binary(), binary(), non_neg_integer(), map()) -> ok.

Record a snapshot

register_schema(StoreId, EventType, Schema)

-spec register_schema(atom(), binary(), map()) -> ok.

Register a schema

register_worker(StoreId)

-spec register_worker(atom()) -> ok | {error, term()}.

Register current process as a worker for a store

register_worker(StoreId, Pid)

-spec register_worker(atom(), pid()) -> ok | {error, term()}.

Register a specific process as a worker for a store

remove_subscription(StoreId, Type, Selector, SubscriptionName)

-spec remove_subscription(atom(), atom(), binary() | map(), binary()) -> ok.

Remove a subscription

save_subscription(StoreId, Type, Selector, SubscriptionName, StartFrom, Subscriber)

-spec save_subscription(atom(),
                        atom(),
                        binary() | map(),
                        binary(),
                        non_neg_integer(),
                        pid() | undefined) ->
                           ok.

Save a subscription

scavenge(StoreId, StreamId, Opts)

-spec scavenge(atom(), binary(), map()) -> {ok, map()} | {error, term()}.

Scavenge a stream (delete old events)

scavenge_dry_run(StoreId, StreamId, Opts)

-spec scavenge_dry_run(atom(), binary(), map()) -> {ok, map()} | {error, term()}.

Dry-run scavenge (preview what would be deleted)

scavenge_matching(StoreId, Pattern, Opts)

-spec scavenge_matching(atom(), binary(), map()) -> {ok, list()} | {error, term()}.

Scavenge streams matching a pattern

start_link(StoreId, LinkName)

-spec start_link(atom(), binary()) -> ok.

Start a link

stop_link(StoreId, LinkName)

-spec stop_link(atom(), binary()) -> ok.

Stop a link

store_stats(StoreId)

-spec store_stats(atom()) -> {ok, map()} | {error, term()}.

Aggregate statistics for a store.

stream_backward(StoreId, StreamId, StartVersion, Count)

-spec stream_backward(atom(), binary(), integer(), non_neg_integer()) -> {ok, list()} | {error, term()}.

Stream events backward from a version

stream_forward(StoreId, StreamId, StartVersion, Count)

-spec stream_forward(atom(), binary(), integer(), integer()) -> {ok, list()} | {error, term()}.

Stream events forward from a version

stream_info(StoreId, StreamId)

-spec stream_info(atom(), binary()) -> {ok, map()} | {error, term()}.

Detailed info for a single stream (timestamps, snapshot coverage).

subscription_lag(StoreId, SubscriptionName)

-spec subscription_lag(atom(), binary()) -> {ok, map()} | {error, term()}.

Calculate lag for a specific subscription.

unregister_schema(StoreId, EventType)

-spec unregister_schema(atom(), binary()) -> ok.

Unregister a schema

unregister_worker(StoreId)

-spec unregister_worker(atom()) -> ok | {error, term()}.

Unregister current process as a worker for a store

unregister_worker(StoreId, Pid)

-spec unregister_worker(atom(), pid()) -> ok | {error, term()}.

Unregister a specific process as a worker for a store

upcast_events(StoreId, Events)

-spec upcast_events(atom(), list()) -> {ok, list()} | {error, term()}.

Upcast events to current schema version

verify_cluster_consistency(StoreId)

-spec verify_cluster_consistency(atom()) -> {ok, map()} | {error, term()}.

Verify cluster consistency for a store

verify_membership_consensus(StoreId)

-spec verify_membership_consensus(atom()) -> {ok, map()} | {error, term()}.

Verify membership consensus for a store

version_at(StoreId, StreamId, Timestamp)

-spec version_at(atom(), binary(), integer()) -> {ok, integer()} | {error, term()}.

Get stream version at a specific timestamp