reckon_db_emitter_group (reckon_db v1.2.3)

View Source

Emitter group management for reckon-db

Uses pg (process groups) for managing emitter workers. Emitters are responsible for broadcasting events to subscribers.

This module provides: - Process group management for emitter workers - Random emitter selection for load distribution - Topic generation for pub/sub - Emitter name generation for registration

Summary

Functions

Broadcast an event to a random emitter in the group

Generate the base emitter name for a subscription

Generate a numbered emitter name for a subscription

Generate the group key for a subscription's emitters

Join one or more processes to the emitter group

Remove one or more processes from the emitter group

Get all member processes in the emitter group

Persist emitter names to persistent_term for fast retrieval

Retrieve previously persisted emitter names

Generate the topic name for a subscription

Types

event/0

-type event() ::
          #event{event_id :: binary(),
                 event_type :: binary(),
                 stream_id :: binary(),
                 version :: non_neg_integer(),
                 data :: map() | binary(),
                 metadata :: map(),
                 tags :: [binary()] | undefined,
                 timestamp :: integer(),
                 epoch_us :: integer(),
                 data_content_type :: binary(),
                 metadata_content_type :: binary()}.

store_id/0

-type store_id() :: atom().

subscription_id/0

-type subscription_id() :: binary().

Functions

broadcast(StoreId, SubscriptionId, Event)

-spec broadcast(store_id(), subscription_id(), event()) -> ok | {error, no_emitters}.

Broadcast an event to a random emitter in the group

If the selected emitter is on the local node, sends a forward_to_local message for optimized local delivery. Otherwise sends a broadcast message.

emitter_name(StoreId, SubscriptionId)

-spec emitter_name(store_id(), subscription_id()) -> atom().

Generate the base emitter name for a subscription

emitter_name(StoreId, SubscriptionId, Number)

-spec emitter_name(store_id(), subscription_id(), pos_integer()) -> atom().

Generate a numbered emitter name for a subscription

group_key(StoreId, SubscriptionId)

-spec group_key(store_id(), subscription_id()) -> {atom(), subscription_id(), emitters}.

Generate the group key for a subscription's emitters

join(StoreId, SubscriptionId, PidOrPids)

-spec join(store_id(), subscription_id(), pid() | [pid()]) -> ok.

Join one or more processes to the emitter group

leave(StoreId, SubscriptionId, PidOrPids)

-spec leave(store_id(), subscription_id(), pid() | [pid()]) -> ok.

Remove one or more processes from the emitter group

members(StoreId, SubscriptionId)

-spec members(store_id(), subscription_id()) -> [pid()].

Get all member processes in the emitter group

persist_emitters(StoreId, SubscriptionId, PoolSize)

-spec persist_emitters(store_id(), subscription_id(), pos_integer()) -> [atom()].

Persist emitter names to persistent_term for fast retrieval

retrieve_emitters(StoreId, SubscriptionId)

-spec retrieve_emitters(store_id(), subscription_id()) -> [atom()].

Retrieve previously persisted emitter names

topic(StoreId, SubscriptionId)

-spec topic(store_id(), subscription_id()) -> binary().

Generate the topic name for a subscription

Special case: the binary <<"$all">> creates a topic for all events