Electric.Shapes.ConsumerRegistry (electric v1.6.2)

Copy Markdown View Source

Summary

Types

shape_handle()

@type shape_handle() :: Electric.shape_handle()

stack_id()

@type stack_id() :: Electric.stack_id()

stack_ref()

@type stack_ref() :: stack_id() | [{:stack_id, stack_id()}] | %{stack_id: stack_id()}

t()

@type t() :: %Electric.Shapes.ConsumerRegistry{
  stack_id: stack_id(),
  table: :ets.table()
}

Functions

active_consumer_count(stack_id)

@spec active_consumer_count(stack_id()) :: non_neg_integer()

broadcast(handle_event_pids)

@spec broadcast([{shape_handle(), term(), pid() | nil}]) ::
  {%{required(shape_handle()) => term()}, %{required(shape_handle()) => term()}}

Calls many GenServers asynchronously with per-handle messages and waits for their responses before returning.

Returns a tuple {suspended, crashed} where:

  • suspended is a map of shape_handle => event for handles whose consumers suspended (these should be retried by the caller)
  • crashed is a map of shape_handle => exit_reason for handles whose consumers crashed (these should NOT be retried)

There is no timeout so if the GenServers do not respond or die, this function will block indefinitely.

enable_suspend(stack_id, hibernate_after, jitter_period)

@spec enable_suspend(stack_id(), pos_integer(), pos_integer()) ::
  consumer_count :: non_neg_integer()

Dynamically (re-)enable consumer suspension on all running consumers.

This allows for dynamically re-configuring consumer suspension even if it was disabled, because the configuration message will have the side-effect of waking all consumers from hibernation.

The jitter_period value allows for spreading the suspension of existing consumers over a large time period to avoid a sudden rush of consumer shutdowns after hibernate_after ms.

To re-enable consumer suspend:

# set the hibernation timeout to 1 minute but phase the suspension of
# existing consumers over a 20 minute period
Electric.Shapes.ConsumerRegistry.enable_suspend(stack_id, 60_000, 60_000 * 20)

Disabling suspension is as easy as:

Electric.StackConfig.put(stack_id, :shape_enable_suspend?, false)

name(stack_id, shape_handle)

new(stack_id, opts \\ [])

publish(events_by_handle, registry_state)

@spec publish(%{required(shape_handle()) => term()}, t()) :: %{
  required(shape_handle()) => term()
}

register_consumer(pid, shape_handle, stack_id)

@spec register_consumer(pid(), shape_handle(), stack_id()) :: {:ok, non_neg_integer()}
@spec register_consumer(pid(), shape_handle(), t()) :: {:ok, non_neg_integer()}
@spec register_consumer(pid(), shape_handle(), :ets.table()) ::
  {:ok, non_neg_integer()}

register_name(arg, pid)

remove_consumer(shape_handle, stack_id)

@spec remove_consumer(shape_handle(), t()) :: :ok
@spec remove_consumer(shape_handle(), stack_id()) :: :ok

unregister_name(arg)

whereis(stack_ref, shape_handle)

@spec whereis(stack_ref(), shape_handle()) :: pid() | nil

whereis_name(arg)