reckon_gater_api (reckon_gater v2.0.1)
View SourceMain API for reckon-gater
Provides the primary interface for accessing reckon-db event stores through the gateway with automatic load balancing and retry.
This module mirrors the ExESDBGater.API pattern from the original Elixir implementation, providing specific functions for each operation rather than a generic call interface.
Stream Operations
{ok, Version} = reckon_gater_api:append_events(my_store, StreamId, Events). {ok, Events} = reckon_gater_api:get_events(my_store, StreamId, 0, 100, forward). {ok, Version} = reckon_gater_api:get_version(my_store, StreamId).
Subscription Operations
ok = reckon_gater_api:save_subscription(my_store, by_stream, Selector, Name). ok = reckon_gater_api:remove_subscription(my_store, by_stream, Selector, Name).
Snapshot Operations
ok = reckon_gater_api:record_snapshot(my_store, SourceId, StreamId, Version, Data). {ok, Snapshot} = reckon_gater_api:read_snapshot(my_store, SourceId, StreamId, Version).
Summary
Functions
Acknowledge receipt of an event by a subscriber
Append events to a stream (auto-versioned)
Append events to a stream with expected version
Build a causation graph for visualization
Check Raft log consistency for a store
Create a new link
Delete a link
Delete a snapshot
Delete a stream and all its events
Census of event types in the store with counts.
Get the full causation chain for an event
Get the event that caused another
Get all events with the same correlation ID
Get events caused by an event
Get events from a stream
Get a link by name
Get current memory pressure level
Get memory statistics
Get a schema by event type
Get the version of a schema
Get all streams in a store
Get a specific subscription by name
Get all subscriptions for a store
Get the current version of a stream
Get all registered workers for a store
Check if a store contains at least one event.
Get gateway health status
Get detailed info about a link
List all snapshots across all streams in a store.
List all links
List all schemas
List snapshots
List all subscriptions for a store with checkpoint positions.
List all managed stores in the cluster
Pure worker-selection policy behind select_worker/1.
Quick health check for a store
Read all events across all streams sorted by epoch_us (global ordering).
Read events by type using native Khepri filtering
Read events by tags (default: ANY match, batch_size 1000).
Read events by tags with options.
Read events in a time range
Read events in a time range with options
Read a snapshot
Read events up to a timestamp
Read events up to a timestamp with options
Register a schema
Register current process as a worker for a store
Register a specific process as a worker for a store
Remove a subscription
Save a subscription
Scavenge a stream (delete old events)
Dry-run scavenge (preview what would be deleted)
Scavenge streams matching a pattern
Start a link
Stop a link
Aggregate statistics for a store.
Stream events backward from a version
Stream events forward from a version
Detailed info for a single stream (timestamps, snapshot coverage).
Calculate lag for a specific subscription.
Unregister a schema
Unregister current process as a worker for a store
Unregister a specific process as a worker for a store
Upcast events to current schema version
Verify cluster consistency for a store
Verify membership consensus for a store
Get stream version at a specific timestamp
Types
Functions
Acknowledge receipt of an event by a subscriber
Append events to a stream (auto-versioned)
-spec append_events(atom(), binary(), integer() | any, list()) -> {ok, integer()} | {error, term()} | {error, {wrong_expected_version, integer()}}.
Append events to a stream with expected version
Build a causation graph for visualization
Check Raft log consistency for a store
Create a new link
Delete a link
-spec delete_snapshot(atom(), binary(), binary(), non_neg_integer()) -> ok.
Delete a snapshot
Delete a stream and all its events
Census of event types in the store with counts.
Get the full causation chain for an event
Get the event that caused another
Get events caused by an event
-spec get_events(atom(), binary(), integer(), integer(), forward | backward) -> {ok, list()} | {error, term()}.
Get events from a stream
Get a link by name
Get current memory pressure level
Get memory statistics
Get a schema by event type
Get the version of a schema
Get all streams in a store
Get a specific subscription by name
Returns the subscription details including the checkpoint.
Get all subscriptions for a store
Get the current version of a stream
-spec get_workers(atom()) -> {ok, [worker_entry()]} | {error, term()}.
Get all registered workers for a store
Check if a store contains at least one event.
-spec health() -> {ok, map()}.
Get gateway health status
Get detailed info about a link
List all snapshots across all streams in a store.
List all links
List all schemas
List snapshots
List all subscriptions for a store with checkpoint positions.
List all managed stores in the cluster
-spec pick_worker([worker_entry()], node(), non_neg_integer()) -> {ok, worker_entry(), non_neg_integer()} | {error, no_workers}.
Pure worker-selection policy behind select_worker/1.
Prefers workers on the caller's own node when any exist; falls back to cluster-wide round-robin only when no local worker is present. Extracted so the policy can be exercised without a running pg scope.
Why local-first
The worker registry is pg-based, so a call to get_workers/1 returns PIDs from every BEAM-connected node. That's the right pool for stores that form a shared Raft cluster — any worker writes into the same Khepri state machine and reads are cluster-wide.
For stores that stay local per node (autojoin=false), a write routed to a remote worker persists in THAT node's private store and is invisible to the caller's local store. Preferring the caller's own node keeps each daemon's writes on its own disk. Falling back to the remote pool preserves the cluster-wide behaviour when the local worker isn't registered yet (boot race) or when the store is genuinely clustered and the caller has no local presence.
Quick health check for a store
-spec read_all_global(atom(), non_neg_integer(), pos_integer()) -> {ok, list()} | {error, term()}.
Read all events across all streams sorted by epoch_us (global ordering).
This is used for catch-up subscriptions where a consumer needs to replay all historical events from a given offset.
Offset is the number of events to skip (not a version number). BatchSize controls how many events to return per call.
-spec read_by_event_types(atom(), [binary()], pos_integer()) -> {ok, list()} | {error, term()}.
Read events by type using native Khepri filtering
This uses the server-side read_by_event_types which performs efficient filtering at the database level rather than loading all events.
Read events by tags (default: ANY match, batch_size 1000).
Queries events across all streams that have matching tags. By default, returns events matching ANY of the provided tags (union).
Read events by tags with options.
Queries events across all streams that have matching tags. Tags are typically used for cross-stream querying in the process-centric model.
Options: - match: any (default) returns events matching ANY tag (union), all returns events matching ALL tags (intersection) - batch_size: Maximum events to return (default 1000)
Read events in a time range
Read events in a time range with options
-spec read_snapshot(atom(), binary(), binary(), non_neg_integer()) -> {ok, map()} | {error, term()}.
Read a snapshot
Read events up to a timestamp
Read events up to a timestamp with options
-spec record_snapshot(atom(), binary(), binary(), non_neg_integer(), map()) -> ok.
Record a snapshot
Register a schema
Register current process as a worker for a store
Register a specific process as a worker for a store
Remove a subscription
-spec save_subscription(atom(), atom(), binary() | map(), binary(), non_neg_integer(), pid() | undefined) -> ok.
Save a subscription
Scavenge a stream (delete old events)
Dry-run scavenge (preview what would be deleted)
Scavenge streams matching a pattern
Start a link
Stop a link
Aggregate statistics for a store.
-spec stream_backward(atom(), binary(), integer(), non_neg_integer()) -> {ok, list()} | {error, term()}.
Stream events backward from a version
Stream events forward from a version
Detailed info for a single stream (timestamps, snapshot coverage).
Calculate lag for a specific subscription.
Unregister a schema
Unregister current process as a worker for a store
Unregister a specific process as a worker for a store
Upcast events to current schema version
Verify cluster consistency for a store
Verify membership consensus for a store
Get stream version at a specific timestamp