reckon_evoq_adapter (reckon_evoq v1.1.4)
View SourceGateway adapter implementation for evoq
Implements the adapter behaviors using esdb_gater_api to route all operations through the reckon-gater load balancer.
This adapter ensures that evoq never directly calls reckon-db modules, instead routing through the gateway for: - Automatic retry with exponential backoff - Load balancing across workers - High availability
Summary
Functions
Acknowledge event via gateway.
Append events to a stream via gateway.
Delete all snapshots via gateway.
Delete snapshot at version via gateway.
Delete a stream via gateway.
Check if stream exists via gateway.
Get subscription by name via gateway.
Get checkpoint for subscription via gateway.
List subscriptions via gateway.
List all streams via gateway.
List snapshot versions via gateway.
Read the latest snapshot via gateway.
Read events from a stream via gateway.
Read all events from a stream via gateway.
Read snapshot at specific version via gateway.
Read events by type via gateway.
Read events by tags via gateway (default: ANY match).
Read events by tags via gateway with match mode.
Save a snapshot via gateway.
Subscribe to events via gateway.
Unsubscribe from events via gateway.
Get current stream version via gateway.
Types
-type event() :: #event{event_id :: binary(), event_type :: binary(), stream_id :: binary(), version :: non_neg_integer(), data :: map() | binary(), metadata :: map(), tags :: [binary()] | undefined, timestamp :: integer(), epoch_us :: integer(), data_content_type :: binary(), metadata_content_type :: binary()}.
-type evoq_event() :: #evoq_event{event_id :: binary(), event_type :: binary(), stream_id :: binary(), version :: non_neg_integer(), data :: map() | binary(), metadata :: map(), tags :: [binary()] | undefined, timestamp :: integer(), epoch_us :: integer(), data_content_type :: binary(), metadata_content_type :: binary()}.
-type evoq_subscription() :: #evoq_subscription{id :: binary(), type :: evoq_subscription_type(), selector :: binary() | map(), subscription_name :: binary(), subscriber_pid :: pid() | undefined, created_at :: integer(), pool_size :: pos_integer(), checkpoint :: non_neg_integer() | undefined, options :: map()}.
-type evoq_subscription_type() :: stream | event_type | event_pattern | event_payload | tags.
Functions
-spec ack(atom(), binary(), binary() | undefined, non_neg_integer()) -> ok | {error, term()}.
Acknowledge event via gateway.
Append events to a stream via gateway.
Events from evoq have a flat structure, but reckon_db expects events with a nested data field. This function transforms the events to the format expected by reckon_db.
Delete all snapshots via gateway.
-spec delete_at_version(atom(), binary(), non_neg_integer()) -> ok | {error, term()}.
Delete snapshot at version via gateway.
Delete a stream via gateway.
Check if stream exists via gateway.
-spec get_by_name(atom(), binary()) -> {ok, evoq_subscription()} | {error, not_found | term()}.
Get subscription by name via gateway.
-spec get_checkpoint(atom(), binary()) -> {ok, non_neg_integer()} | {error, not_found | term()}.
Get checkpoint for subscription via gateway.
-spec list(atom()) -> {ok, [evoq_subscription()]} | {error, term()}.
List subscriptions via gateway.
List all streams via gateway.
-spec list_versions(atom(), binary()) -> {ok, [non_neg_integer()]} | {error, term()}.
List snapshot versions via gateway.
-spec read(atom(), binary()) -> {ok, evoq_snapshot()} | {error, not_found | term()}.
Read the latest snapshot via gateway.
-spec read(atom(), binary(), non_neg_integer(), pos_integer(), forward | backward) -> {ok, [evoq_event()]} | {error, term()}.
Read events from a stream via gateway.
For event sourcing, a non-existent stream just means no events yet, so we translate stream_not_found to an empty list.
-spec read_all(atom(), binary(), forward | backward) -> {ok, [evoq_event()]} | {error, term()}.
Read all events from a stream via gateway.
-spec read_at_version(atom(), binary(), non_neg_integer()) -> {ok, evoq_snapshot()} | {error, not_found | term()}.
Read snapshot at specific version via gateway.
-spec read_by_event_types(atom(), [binary()], pos_integer()) -> {ok, [evoq_event()]} | {error, term()}.
Read events by type via gateway.
Uses the server-side native Khepri filtering for efficient type-based queries. Events are filtered at the database level, avoiding loading all events into memory.
-spec read_by_tags(atom(), [binary()], pos_integer()) -> {ok, [evoq_event()]} | {error, term()}.
Read events by tags via gateway (default: ANY match).
Tags provide cross-stream querying for the process-centric model. Use this to find all events related to specific participants.
-spec read_by_tags(atom(), [binary()], any | all, pos_integer()) -> {ok, [evoq_event()]} | {error, term()}.
Read events by tags via gateway with match mode.
Tags provide cross-stream querying for the process-centric model. Events are filtered at the database level.
Match modes: any - Return events matching ANY of the tags (union) all - Return events matching ALL of the tags (intersection)
Save a snapshot via gateway.
-spec subscribe(atom(), evoq_subscription_type(), binary() | map(), binary(), map()) -> {ok, binary()} | {error, term()}.
Subscribe to events via gateway.
Unsubscribe from events via gateway.
Get current stream version via gateway.