Electric.Shapes.ConsumerRegistry (electric v1.4.13)

View Source

Summary

Types

shape_handle()

@type shape_handle() :: Electric.shape_handle()

stack_id()

@type stack_id() :: Electric.stack_id()

stack_ref()

@type stack_ref() :: stack_id() | [{:stack_id, stack_id()}] | %{stack_id: stack_id()}

t()

@type t() :: %Electric.Shapes.ConsumerRegistry{
  stack_id: stack_id(),
  table: :ets.table()
}

Functions

active_consumer_count(stack_id)

@spec active_consumer_count(stack_id()) :: non_neg_integer()

broadcast(handle_event_pids)

@spec broadcast([{shape_handle(), term(), pid() | nil}]) :: %{
  required(shape_handle()) => term()
}

Calls many GenServers asynchronously with per-handle messages and waits for their responses before returning.

Returns a map of shape_handle => event for handles that need to be retried (because their consumers suspended).

There is no timeout so if the GenServers do not respond or die, this function will block indefinitely.

enable_suspend(stack_id, hibernate_after, jitter_period)

@spec enable_suspend(stack_id(), pos_integer(), pos_integer()) ::
  consumer_count :: non_neg_integer()

Dynamically (re-)enable consumer suspension on all running consumers.

This allows for dynamically re-configuring consumer suspension even if it was disabled, because the configuration message will have the side-effect of waking all consumers from hibernation.

The jitter_period value allows for spreading the suspension of existing consumers over a large time period to avoid a sudden rush of consumer shutdowns after hibernate_after ms.

To re-enable consumer suspend:

# set the hibernation timeout to 1 minute but phase the suspension of
# existing consumers over a 20 minute period
Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 60_000 * 20)

Disabling suspension is as easy as:

Electric.StackConfig.put(stack_id, :shape_enable_suspend?, false)

name(stack_id, shape_handle)

new(stack_id, opts \\ [])

publish(events_by_handle, registry_state)

@spec publish(%{required(shape_handle()) => term()}, t()) :: :ok

register_consumer(pid, shape_handle, stack_id)

@spec register_consumer(pid(), shape_handle(), stack_id()) :: {:ok, non_neg_integer()}
@spec register_consumer(pid(), shape_handle(), t()) :: {:ok, non_neg_integer()}
@spec register_consumer(pid(), shape_handle(), :ets.table()) ::
  {:ok, non_neg_integer()}

register_name(arg, pid)

remove_consumer(shape_handle, stack_id)

@spec remove_consumer(shape_handle(), t()) :: :ok
@spec remove_consumer(shape_handle(), stack_id()) :: :ok

unregister_name(arg)

whereis(stack_ref, shape_handle)

@spec whereis(stack_ref(), shape_handle()) :: pid() | nil

whereis_name(arg)