Electric.Shapes.Consumer.MoveIns (electric v1.4.13)

View Source

Summary

Types

t()

Information needed to reason about move-in handling and correct stream processing.

Functions

Add information about a new move-in to the state for which we're waiting. Snapshot is initially nil and will be set later when the query begins.

Check if a change is already visible in one of the completed move-ins.

Change a move-in from "waiting" to "filtering", marking it as complete and return best-effort visibility boundary.

Garbage collect touches that are visible in all pending snapshots. A touch is visible if its xid is before the minimum xmin of all waiting snapshots.

Get the stored maximum resolved snapshot and clear it, or return nil if none is stored. Returns {snapshot | nil, updated_state}.

Check if the given snapshot is the minimum among all concurrent waiting move-ins (excluding the current one being resolved, and only considering those with known snapshots).

Remove completed move-ins from the state.

Set the snapshot for a waiting move-in when it becomes known.

Check if a query result row should be skipped because a fresher version exists in the stream. Skip if: touch exists AND touch xid is NOT visible in query snapshot.

Store or update the maximum resolved snapshot. If there's already a stored snapshot, keep the maximum of the two.

Track a touch for a non-delete change. Returns updated touch_tracker.

Types

in_flight_values()

@type in_flight_values() :: %{required(term()) => MapSet.t()}

move_in_name()

@type move_in_name() :: String.t()

pg_snapshot()

t()

@type t() :: %Electric.Shapes.Consumer.MoveIns{
  filtering_move_ins: [{pg_snapshot(), keys :: [String.t()]}],
  in_flight_values: in_flight_values(),
  maximum_resolved_snapshot: nil | pg_snapshot(),
  minimum_unresolved_snapshot: nil | pg_snapshot(),
  move_in_buffering_snapshot: nil | pg_snapshot(),
  moved_out_tags: %{required(move_in_name()) => MapSet.t(String.t())},
  touch_tracker: %{required(String.t()) => pos_integer()},
  waiting_move_ins: %{
    required(move_in_name()) => {pg_snapshot() | nil, {term(), MapSet.t()}}
  }
}

Information needed to reason about move-in handling and correct stream processing.

  • waiting_move_ins: Information about move-ins we're waiting for. That means a move-in was triggered, but
                    query results are not yet available. The map value has pg snapshot and actual values that were
                    moved in and thus should be skipped in where clause evaluation until the results are appended to the log
  • filtering_move_ins: Information about move-ins we're filtering. That means a move-in has resolved and was
                      added to the shape log, and we need to skip changes that are already visible there.
  • touch_tracker: A map of keys to xids of transactions that have touched them. This is used to skip changes
                 inside move-in query results that are already visible in the shape log.
  • move_in_buffering_snapshot: A snapshot that is a union of all the "waiting" move-in snapshots. This is used to
                              reduce a check whether something is visible in any of the "waiting" move-in snapshots
                              down to a single check instead of checking each snapshot individually.
  • in_flight_values: A precalculated map of all moved-in values that caused a move-in and thus should be skipped in
                    where clause evaluation until the results are appended to the log.
  • moved_out_tags: A map of move-in names to sets of tags that were moved out while the move-in was happening and thus
                  should be skipped when appending move-in results to the log.
  • maximum_resolved_snapshot: Stores the maximum snapshot of resolved move-ins that weren't immediately appended as
                             snapshot-end control messages, to be appended when the last concurrent move-in resolves.
  • minimum_unresolved_snapshot: Stores the minimum snapshot of unresolved move-ins.

Functions

add_waiting(state, name, moved_values)

@spec add_waiting(t(), move_in_name(), {term(), MapSet.t()}) :: t()

Add information about a new move-in to the state for which we're waiting. Snapshot is initially nil and will be set later when the query begins.

change_already_visible?(arg1, xid, arg3)

@spec change_already_visible?(
  t(),
  Electric.Postgres.Xid.anyxid(),
  Electric.Replication.Changes.change()
) :: boolean()

Check if a change is already visible in one of the completed move-ins.

A visible change means it needs to be skipped to avoid duplicates.

change_to_filtering(state, name, key_set)

@spec change_to_filtering(t(), move_in_name(), MapSet.t(String.t())) ::
  {visibility_boundary :: nil | pg_snapshot(), t()}

Change a move-in from "waiting" to "filtering", marking it as complete and return best-effort visibility boundary.

change_visible_in_unresolved_move_ins_for_values?(move_ins, referenced_values, xid)

gc_touch_tracker(state)

@spec gc_touch_tracker(t()) :: t()

Garbage collect touches that are visible in all pending snapshots. A touch is visible if its xid is before the minimum xmin of all waiting snapshots.

get_and_clear_maximum_resolved_snapshot(state)

@spec get_and_clear_maximum_resolved_snapshot(t()) :: {pg_snapshot() | nil, t()}

Get the stored maximum resolved snapshot and clear it, or return nil if none is stored. Returns {snapshot | nil, updated_state}.

is_minimum_snapshot?(move_ins, snapshot)

@spec is_minimum_snapshot?(t(), pg_snapshot()) :: boolean()

Check if the given snapshot is the minimum among all concurrent waiting move-ins (excluding the current one being resolved, and only considering those with known snapshots).

move_out_happened(state, new_tags)

new()

remove_completed(state, transaction)

@spec remove_completed(t(), Electric.Replication.Changes.Transaction.t()) :: t()

Remove completed move-ins from the state.

Move-in is considered "completed" (i.e. not included in the filtering logic) once we see any transaction that is after the end of the move-in snapshot.

Filtering generally is applied only to transactions that are already visible in the snapshot, and those can only be with xid < xmax.

set_snapshot(state, name, snapshot)

@spec set_snapshot(t(), move_in_name(), pg_snapshot()) :: t()

Set the snapshot for a waiting move-in when it becomes known.

should_skip_query_row?(touch_tracker, snapshot, key)

@spec should_skip_query_row?(
  %{required(String.t()) => pos_integer()},
  pg_snapshot(),
  String.t()
) ::
  boolean()

Check if a query result row should be skipped because a fresher version exists in the stream. Skip if: touch exists AND touch xid is NOT visible in query snapshot.

store_maximum_resolved_snapshot(state, snapshot)

@spec store_maximum_resolved_snapshot(t(), pg_snapshot()) :: t()

Store or update the maximum resolved snapshot. If there's already a stored snapshot, keep the maximum of the two.

track_touch(state, xid, arg3)

@spec track_touch(t(), pos_integer(), Electric.Replication.Changes.change()) :: t()

Track a touch for a non-delete change. Returns updated touch_tracker.