Electric.Replication.ShapeLogCollector (electric v1.4.13)

View Source

The ShapeLogCollector is responsible for collecting and processing shape log operations and managing shape registrations.

It consists of two main components: the processor and the RequestBatcher.

The processor handles the processing of shape log operations and manages the shape matching index updates. When any txn comes from postgres, we need to store it into the log for this shape if and only if it has txid >= xmin of the snapshot.

The RequestBatcher batches the registration and deregistration of shapes to avoid overwhelming the processor with frequent updates.

Summary

Functions

Returns the list of currently active shapes being tracked in the shape matching filters.

Adds a shape to the shape matching index in the ShapeLogCollector used for matching and sending replication stream operations.

Returns a specification to start this module under a supervisor.

Handles a replication log events.

Handles batched shape registration updates from the RequestBatcher.

Callback implementation for GenServer.init/1.

Marks the collector as ready to process operations from the replication stream.

Utility for tests, monitors the SLC process.

Notifies the ShapeLogCollector that a shape's data has been flushed up to a certain offset, used to mark the overall flush progress.

Removes a shape from the shape matching index in the ShapeLogCollector. This call succeeds before the shape is actually removed from the index.

Set process flags on the given ShapeLogCollector process.

Functions

activate_mocked_functions_from_test_process()

active_shapes(stack_id)

@spec active_shapes(Electric.stack_id()) :: MapSet.t(Electric.shape_handle())

Returns the list of currently active shapes being tracked in the shape matching filters.

add_shape(stack_id, shape_handle, shape, operation)

Adds a shape to the shape matching index in the ShapeLogCollector used for matching and sending replication stream operations.

child_spec(init_arg)

Returns a specification to start this module under a supervisor.

See Supervisor.

get_process_flags(stack_id)

handle_continue(atom, state)

Callback implementation for GenServer.handle_continue/2.

handle_event(event, stack_id)

Handles a replication log events.

Should be called with operations received from the replication stream.

Uuse GenServer.call/2 here to make the event processing synchronous.

This call/3 has a timeout of :infinity because timeouts are handled at the storage layer, that is this function doesn't assume any aggregate max time for the shape consumers to actually commit the new txn to disk, instead the storage backend is responsible for determining how long a write should reasonably take and if that fails it should raise.

handle_shape_registration_updates(stack_id, shapes_to_add, shapes_to_remove)

Handles batched shape registration updates from the RequestBatcher.

init(opts)

Callback implementation for GenServer.init/1.

mark_as_ready(stack_id)

@spec mark_as_ready(Electric.stack_id()) :: :ok

Marks the collector as ready to process operations from the replication stream.

This is typically called after the initial shape registrations have been processed.

monitor(stack_id)

Utility for tests, monitors the SLC process.

name(stack_id)

notify_flushed(stack_id, shape_handle, offset)

Notifies the ShapeLogCollector that a shape's data has been flushed up to a certain offset, used to mark the overall flush progress.

Should be called by consumer processes after they flush data.

remove_shape(stack_id, shape_handle)

Removes a shape from the shape matching index in the ShapeLogCollector. This call succeeds before the shape is actually removed from the index.

set_process_flags(stack_id, flags)

Set process flags on the given ShapeLogCollector process.

Accepts a list of flags to set, see Process.flag/2 for valid settings.

Doesn't crash if given an invalid flag or value - instead returns the list of invalid flags.

iex> ShapeLogCollector.set_process_flags("my-stack-id", min_heap_size: 1024 * 1024, min_bin_vheap_size: 1024 * 1024)
{:ok, settings: [min_heap_size: 1024 * 1024, min_bin_vheap_size: 1024 * 1024], invalid: []}

start_link(opts)