evoq_subscriptions (evoq v1.14.1)
View SourceFacade for subscription operations via adapter.
Provides a consistent interface for subscribing to events, delegating to a configured subscription adapter.
The adapter is responsible for translating between the event store's native format and evoq's #evoq_event{} records. Subscribers always receive {events, [#evoq_event{}]} messages.
Configuration (Required)
You must configure a subscription adapter in your application config:
{evoq, [
{subscription_adapter, reckon_evoq_adapter}
]}Usage
%% Subscribe to events (typically in a gen_server init)
{ok, SubId} = evoq_subscriptions:subscribe(
my_store, event_type, <<"order_placed_v1">>,
<<"my_projection">>, #{subscriber_pid => self()}
),
%% Receive events in handle_info
handle_info({events, Events}, State) ->
lists:foreach(fun(#evoq_event{data = Data}) ->
project(Data)
end, Events),
{noreply, State}.
Summary
Functions
Acknowledge an event has been processed. Updates the subscription checkpoint.
Get the configured subscription adapter. Crashes if no adapter is configured.
Get a subscription by name.
Get the current checkpoint for a subscription.
List all subscriptions for a store.
Set the subscription adapter (primarily for testing).
Subscribe to events from a store.
Unsubscribe from events.
Types
-type evoq_subscription() :: #evoq_subscription{id :: binary(), type :: evoq_subscription_type(), selector :: binary() | map(), subscription_name :: binary(), subscriber_pid :: pid() | undefined, created_at :: integer(), pool_size :: pos_integer(), checkpoint :: non_neg_integer() | undefined, options :: map()}.
-type evoq_subscription_type() :: stream | event_type | event_pattern | event_payload | tags.
Functions
-spec ack(atom(), binary(), binary() | undefined, non_neg_integer()) -> ok | {error, term()}.
Acknowledge an event has been processed. Updates the subscription checkpoint.
-spec get_adapter() -> module().
Get the configured subscription adapter. Crashes if no adapter is configured.
-spec get_by_name(atom(), binary()) -> {ok, evoq_subscription()} | {error, not_found | term()}.
Get a subscription by name.
-spec get_checkpoint(atom(), binary()) -> {ok, non_neg_integer()} | {error, not_found | term()}.
Get the current checkpoint for a subscription.
-spec list(atom()) -> {ok, [evoq_subscription()]} | {error, term()}.
List all subscriptions for a store.
-spec set_adapter(module()) -> ok.
Set the subscription adapter (primarily for testing).
-spec subscribe(atom(), evoq_subscription_type(), binary() | map(), binary(), map()) -> {ok, binary()} | {error, term()}.
Subscribe to events from a store.
The adapter guarantees that the subscriber_pid receives {events, [#evoq_event{}]} messages with proper envelope structure (event_type, stream_id, version, data, metadata).
Options: subscriber_pid - PID to receive events (required for push delivery) start_from - Starting position (default: 0) pool_size - Number of emitters (default: 1)
Unsubscribe from events.