Electric.Shapes.ConsumerRegistry (electric v1.4.13)
View SourceSummary
Functions
Calls many GenServers asynchronously with per-handle messages and waits for their responses before returning.
Dynamically (re-)enable consumer suspension on all running consumers.
Types
@type shape_handle() :: Electric.shape_handle()
@type stack_id() :: Electric.stack_id()
@type t() :: %Electric.Shapes.ConsumerRegistry{ stack_id: stack_id(), table: :ets.table() }
Functions
@spec active_consumer_count(stack_id()) :: non_neg_integer()
@spec broadcast([{shape_handle(), term(), pid() | nil}]) :: %{ required(shape_handle()) => term() }
Calls many GenServers asynchronously with per-handle messages and waits for their responses before returning.
Returns a map of shape_handle => event for handles that need to be retried
(because their consumers suspended).
There is no timeout so if the GenServers do not respond or die, this function will block indefinitely.
@spec enable_suspend(stack_id(), pos_integer(), pos_integer()) :: consumer_count :: non_neg_integer()
Dynamically (re-)enable consumer suspension on all running consumers.
This allows for dynamically re-configuring consumer suspension even if it was disabled, because the configuration message will have the side-effect of waking all consumers from hibernation.
The jitter_period value allows for spreading the suspension of existing
consumers over a large time period to avoid a sudden rush of consumer
shutdowns after hibernate_after ms.
To re-enable consumer suspend:
# set the hibernation timeout to 1 minute but phase the suspension of
# existing consumers over a 20 minute period
Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 60_000 * 20)Disabling suspension is as easy as:
Electric.StackConfig.put(stack_id, :shape_enable_suspend?, false)
@spec publish(%{required(shape_handle()) => term()}, t()) :: :ok
@spec register_consumer(pid(), shape_handle(), stack_id()) :: {:ok, non_neg_integer()}
@spec register_consumer(pid(), shape_handle(), t()) :: {:ok, non_neg_integer()}
@spec register_consumer(pid(), shape_handle(), :ets.table()) :: {:ok, non_neg_integer()}
@spec remove_consumer(shape_handle(), t()) :: :ok
@spec remove_consumer(shape_handle(), stack_id()) :: :ok
@spec whereis(stack_ref(), shape_handle()) :: pid() | nil