esdb_gater_api (reckon_gater v1.1.2)

View Source

Main API for reckon-gater

Provides the primary interface for accessing reckon-db event stores through the gateway with automatic load balancing and retry.

This module mirrors the ExESDBGater.API pattern from the original Elixir implementation, providing specific functions for each operation rather than a generic call interface.

Stream Operations

{ok, Version} = esdb_gater_api:append_events(my_store, StreamId, Events). {ok, Events} = esdb_gater_api:get_events(my_store, StreamId, 0, 100, forward). {ok, Version} = esdb_gater_api:get_version(my_store, StreamId).

Subscription Operations

ok = esdb_gater_api:save_subscription(my_store, by_stream, Selector, Name). ok = esdb_gater_api:remove_subscription(my_store, by_stream, Selector, Name).

Snapshot Operations

ok = esdb_gater_api:record_snapshot(my_store, SourceId, StreamId, Version, Data). {ok, Snapshot} = esdb_gater_api:read_snapshot(my_store, SourceId, StreamId, Version).

Summary

Functions

Acknowledge receipt of an event by a subscriber

Append events to a stream (auto-versioned)

Append events to a stream with expected version

Build a causation graph for visualization

Check Raft log consistency for a store

Create a new link

Delete a stream and all its events

Get the full causation chain for an event

Get the event that caused another

Get all events with the same correlation ID

Get events caused by an event

Get a link by name

Get current memory pressure level

Get memory statistics

Get a schema by event type

Get the version of a schema

Get all streams in a store

Get a specific subscription by name

Get all subscriptions for a store

Get the current version of a stream

Get all registered workers for a store

Get gateway health status

Get detailed info about a link

List all links

List all schemas

List all managed stores in the cluster

Quick health check for a store

Read events by type using native Khepri filtering

Read events by tags (default: ANY match, batch_size 1000).

Read events by tags with options.

Read events in a time range with options

Read events up to a timestamp

Read events up to a timestamp with options

Register current process as a worker for a store

Register a specific process as a worker for a store

Scavenge a stream (delete old events)

Dry-run scavenge (preview what would be deleted)

Scavenge streams matching a pattern

Stream events backward from a version

Stream events forward from a version

Unregister current process as a worker for a store

Unregister a specific process as a worker for a store

Upcast events to current schema version

Verify cluster consistency for a store

Verify membership consensus for a store

Get stream version at a specific timestamp

Types

worker_entry/0

-type worker_entry() ::
          #worker_entry{store_id :: atom(), node :: node(), pid :: pid(), registered_at :: integer()}.

Functions

ack_event(StoreId, SubscriptionName, SubscriberPid, Event)

-spec ack_event(atom(), binary(), pid(), map()) -> ok.

Acknowledge receipt of an event by a subscriber

append_events(StoreId, StreamId, Events)

-spec append_events(atom(), binary(), list()) -> {ok, integer()} | {error, term()}.

Append events to a stream (auto-versioned)

append_events(StoreId, StreamId, ExpectedVersion, Events)

-spec append_events(atom(), binary(), integer() | any, list()) ->
                       {ok, integer()} | {error, term()} | {error, {wrong_expected_version, integer()}}.

Append events to a stream with expected version

build_causation_graph(StoreId, Id)

-spec build_causation_graph(atom(), binary()) -> {ok, map()} | {error, term()}.

Build a causation graph for visualization

check_raft_log_consistency(StoreId)

-spec check_raft_log_consistency(atom()) -> {ok, map()} | {error, term()}.

Check Raft log consistency for a store

create_link(StoreId, LinkSpec)

-spec create_link(atom(), map()) -> ok.

Create a new link

delete_link(StoreId, LinkName)

-spec delete_link(atom(), binary()) -> ok.

Delete a link

delete_snapshot(StoreId, SourceUuid, StreamUuid, Version)

-spec delete_snapshot(atom(), binary(), binary(), non_neg_integer()) -> ok.

Delete a snapshot

delete_stream(StoreId, StreamId)

-spec delete_stream(atom(), binary()) -> ok | {error, term()}.

Delete a stream and all its events

get_causation_chain(StoreId, EventId)

-spec get_causation_chain(atom(), binary()) -> {ok, list()} | {error, term()}.

Get the full causation chain for an event

get_cause(StoreId, EventId)

-spec get_cause(atom(), binary()) -> {ok, map()} | {error, term()}.

Get the event that caused another

get_correlated(StoreId, CorrelationId)

-spec get_correlated(atom(), binary()) -> {ok, list()} | {error, term()}.

Get all events with the same correlation ID

get_effects(StoreId, EventId)

-spec get_effects(atom(), binary()) -> {ok, list()} | {error, term()}.

Get events caused by an event

get_events(StoreId, StreamId, StartVersion, Count, Direction)

-spec get_events(atom(), binary(), integer(), integer(), forward | backward) ->
                    {ok, list()} | {error, term()}.

Get events from a stream

get_link(StoreId, LinkName)

-spec get_link(atom(), binary()) -> {ok, map()} | {error, term()}.

Get a link by name

get_memory_level(StoreId)

-spec get_memory_level(atom()) -> {ok, atom()} | {error, term()}.

Get current memory pressure level

get_memory_stats(StoreId)

-spec get_memory_stats(atom()) -> {ok, map()} | {error, term()}.

Get memory statistics

get_schema(StoreId, EventType)

-spec get_schema(atom(), binary()) -> {ok, map()} | {error, term()}.

Get a schema by event type

get_schema_version(StoreId, EventType)

-spec get_schema_version(atom(), binary()) -> {ok, integer()} | {error, term()}.

Get the version of a schema

get_streams(StoreId)

-spec get_streams(atom()) -> {ok, list()} | {error, term()}.

Get all streams in a store

get_subscription(StoreId, SubscriptionName)

-spec get_subscription(atom(), binary()) -> {ok, map()} | {error, term()}.

Get a specific subscription by name

Returns the subscription details including the checkpoint.

get_subscriptions(StoreId)

-spec get_subscriptions(atom()) -> {ok, list()} | {error, term()}.

Get all subscriptions for a store

get_version(StoreId, StreamId)

-spec get_version(atom(), binary()) -> {ok, integer()} | {error, term()}.

Get the current version of a stream

get_workers(StoreId)

-spec get_workers(atom()) -> {ok, [worker_entry()]} | {error, term()}.

Get all registered workers for a store

health()

-spec health() -> {ok, map()}.

Get gateway health status

list_links(StoreId)

-spec list_links(atom()) -> {ok, list()} | {error, term()}.

List all links

list_schemas(StoreId)

-spec list_schemas(atom()) -> {ok, list()} | {error, term()}.

List all schemas

list_snapshots(StoreId, SourceUuid, StreamUuid)

-spec list_snapshots(atom(), binary() | any, binary() | any) -> {ok, [map()]} | {error, term()}.

List snapshots

list_stores()

-spec list_stores() -> {ok, list()} | {error, term()}.

List all managed stores in the cluster

quick_health_check(StoreId)

-spec quick_health_check(atom()) -> {ok, map()} | {error, term()}.

Quick health check for a store

read_by_event_types(StoreId, EventTypes, BatchSize)

-spec read_by_event_types(atom(), [binary()], pos_integer()) -> {ok, list()} | {error, term()}.

Read events by type using native Khepri filtering

This uses the server-side read_by_event_types which performs efficient filtering at the database level rather than loading all events.

read_by_tags(StoreId, Tags)

-spec read_by_tags(atom(), [binary()]) -> {ok, list()} | {error, term()}.

Read events by tags (default: ANY match, batch_size 1000).

Queries events across all streams that have matching tags. By default, returns events matching ANY of the provided tags (union).

read_by_tags(StoreId, Tags, Opts)

-spec read_by_tags(atom(), [binary()], map()) -> {ok, list()} | {error, term()}.

Read events by tags with options.

Queries events across all streams that have matching tags. Tags are typically used for cross-stream querying in the process-centric model.

Options: - match: any (default) returns events matching ANY tag (union), all returns events matching ALL tags (intersection) - batch_size: Maximum events to return (default 1000)

read_range(StoreId, StreamId, FromTimestamp, ToTimestamp)

-spec read_range(atom(), binary(), integer(), integer()) -> {ok, list()} | {error, term()}.

Read events in a time range

read_range(StoreId, StreamId, FromTimestamp, ToTimestamp, Opts)

-spec read_range(atom(), binary(), integer(), integer(), map()) -> {ok, list()} | {error, term()}.

Read events in a time range with options

read_snapshot(StoreId, SourceUuid, StreamUuid, Version)

-spec read_snapshot(atom(), binary(), binary(), non_neg_integer()) -> {ok, map()} | {error, term()}.

Read a snapshot

read_until(StoreId, StreamId, Timestamp)

-spec read_until(atom(), binary(), integer()) -> {ok, list()} | {error, term()}.

Read events up to a timestamp

read_until(StoreId, StreamId, Timestamp, Opts)

-spec read_until(atom(), binary(), integer(), map()) -> {ok, list()} | {error, term()}.

Read events up to a timestamp with options

record_snapshot(StoreId, SourceUuid, StreamUuid, Version, SnapshotRecord)

-spec record_snapshot(atom(), binary(), binary(), non_neg_integer(), map()) -> ok.

Record a snapshot

register_schema(StoreId, EventType, Schema)

-spec register_schema(atom(), binary(), map()) -> ok.

Register a schema

register_worker(StoreId)

-spec register_worker(atom()) -> ok | {error, term()}.

Register current process as a worker for a store

register_worker(StoreId, Pid)

-spec register_worker(atom(), pid()) -> ok | {error, term()}.

Register a specific process as a worker for a store

remove_subscription(StoreId, Type, Selector, SubscriptionName)

-spec remove_subscription(atom(), atom(), binary() | map(), binary()) -> ok.

Remove a subscription

save_subscription(StoreId, Type, Selector, SubscriptionName, StartFrom, Subscriber)

-spec save_subscription(atom(),
                        atom(),
                        binary() | map(),
                        binary(),
                        non_neg_integer(),
                        pid() | undefined) ->
                           ok.

Save a subscription

scavenge(StoreId, StreamId, Opts)

-spec scavenge(atom(), binary(), map()) -> {ok, map()} | {error, term()}.

Scavenge a stream (delete old events)

scavenge_dry_run(StoreId, StreamId, Opts)

-spec scavenge_dry_run(atom(), binary(), map()) -> {ok, map()} | {error, term()}.

Dry-run scavenge (preview what would be deleted)

scavenge_matching(StoreId, Pattern, Opts)

-spec scavenge_matching(atom(), binary(), map()) -> {ok, list()} | {error, term()}.

Scavenge streams matching a pattern

start_link(StoreId, LinkName)

-spec start_link(atom(), binary()) -> ok.

Start a link

stop_link(StoreId, LinkName)

-spec stop_link(atom(), binary()) -> ok.

Stop a link

stream_backward(StoreId, StreamId, StartVersion, Count)

-spec stream_backward(atom(), binary(), integer(), non_neg_integer()) -> {ok, list()} | {error, term()}.

Stream events backward from a version

stream_forward(StoreId, StreamId, StartVersion, Count)

-spec stream_forward(atom(), binary(), integer(), integer()) -> {ok, list()} | {error, term()}.

Stream events forward from a version

unregister_schema(StoreId, EventType)

-spec unregister_schema(atom(), binary()) -> ok.

Unregister a schema

unregister_worker(StoreId)

-spec unregister_worker(atom()) -> ok | {error, term()}.

Unregister current process as a worker for a store

unregister_worker(StoreId, Pid)

-spec unregister_worker(atom(), pid()) -> ok | {error, term()}.

Unregister a specific process as a worker for a store

upcast_events(StoreId, Events)

-spec upcast_events(atom(), list()) -> {ok, list()} | {error, term()}.

Upcast events to current schema version

verify_cluster_consistency(StoreId)

-spec verify_cluster_consistency(atom()) -> {ok, map()} | {error, term()}.

Verify cluster consistency for a store

verify_membership_consensus(StoreId)

-spec verify_membership_consensus(atom()) -> {ok, map()} | {error, term()}.

Verify membership consensus for a store

version_at(StoreId, StreamId, Timestamp)

-spec version_at(atom(), binary(), integer()) -> {ok, integer()} | {error, term()}.

Get stream version at a specific timestamp