evoq_subscriptions (evoq v1.14.1)

View Source

Facade for subscription operations via adapter.

Provides a consistent interface for subscribing to events, delegating to a configured subscription adapter.

The adapter is responsible for translating between the event store's native format and evoq's #evoq_event{} records. Subscribers always receive {events, [#evoq_event{}]} messages.

Configuration (Required)

You must configure a subscription adapter in your application config:

  {evoq, [
      {subscription_adapter, reckon_evoq_adapter}
  ]}

Usage

  %% Subscribe to events (typically in a gen_server init)
  {ok, SubId} = evoq_subscriptions:subscribe(
      my_store, event_type, <<"order_placed_v1">>,
      <<"my_projection">>, #{subscriber_pid => self()}
  ),
 
  %% Receive events in handle_info
  handle_info({events, Events}, State) ->
      lists:foreach(fun(#evoq_event{data = Data}) ->
          project(Data)
      end, Events),
      {noreply, State}.

Summary

Functions

Acknowledge an event has been processed. Updates the subscription checkpoint.

Get the configured subscription adapter. Crashes if no adapter is configured.

Get a subscription by name.

Get the current checkpoint for a subscription.

List all subscriptions for a store.

Set the subscription adapter (primarily for testing).

Unsubscribe from events.

Types

evoq_subscription/0

-type evoq_subscription() ::
          #evoq_subscription{id :: binary(),
                             type :: evoq_subscription_type(),
                             selector :: binary() | map(),
                             subscription_name :: binary(),
                             subscriber_pid :: pid() | undefined,
                             created_at :: integer(),
                             pool_size :: pos_integer(),
                             checkpoint :: non_neg_integer() | undefined,
                             options :: map()}.

evoq_subscription_type/0

-type evoq_subscription_type() :: stream | event_type | event_pattern | event_payload | tags.

Functions

ack(StoreId, SubscriptionName, StreamId, Position)

-spec ack(atom(), binary(), binary() | undefined, non_neg_integer()) -> ok | {error, term()}.

Acknowledge an event has been processed. Updates the subscription checkpoint.

get_adapter()

-spec get_adapter() -> module().

Get the configured subscription adapter. Crashes if no adapter is configured.

get_by_name(StoreId, SubscriptionName)

-spec get_by_name(atom(), binary()) -> {ok, evoq_subscription()} | {error, not_found | term()}.

Get a subscription by name.

get_checkpoint(StoreId, SubscriptionName)

-spec get_checkpoint(atom(), binary()) -> {ok, non_neg_integer()} | {error, not_found | term()}.

Get the current checkpoint for a subscription.

list(StoreId)

-spec list(atom()) -> {ok, [evoq_subscription()]} | {error, term()}.

List all subscriptions for a store.

set_adapter(Adapter)

-spec set_adapter(module()) -> ok.

Set the subscription adapter (primarily for testing).

subscribe(StoreId, Type, Selector, SubscriptionName, Opts)

-spec subscribe(atom(), evoq_subscription_type(), binary() | map(), binary(), map()) ->
                   {ok, binary()} | {error, term()}.

Subscribe to events from a store.

The adapter guarantees that the subscriber_pid receives {events, [#evoq_event{}]} messages with proper envelope structure (event_type, stream_id, version, data, metadata).

Options: subscriber_pid - PID to receive events (required for push delivery) start_from - Starting position (default: 0) pool_size - Number of emitters (default: 1)

unsubscribe(StoreId, SubscriptionId)

-spec unsubscribe(atom(), binary()) -> ok | {error, term()}.

Unsubscribe from events.