Electric.Shapes.Consumer.MoveIns (electric v1.4.13)
View SourceSummary
Types
Information needed to reason about move-in handling and correct stream processing.
Functions
Add information about a new move-in to the state for which we're waiting. Snapshot is initially nil and will be set later when the query begins.
Check if a change is already visible in one of the completed move-ins.
Change a move-in from "waiting" to "filtering", marking it as complete and return best-effort visibility boundary.
Garbage collect touches that are visible in all pending snapshots. A touch is visible if its xid is before the minimum xmin of all waiting snapshots.
Get the stored maximum resolved snapshot and clear it, or return nil if none is stored. Returns {snapshot | nil, updated_state}.
Check if the given snapshot is the minimum among all concurrent waiting move-ins (excluding the current one being resolved, and only considering those with known snapshots).
Remove completed move-ins from the state.
Set the snapshot for a waiting move-in when it becomes known.
Check if a query result row should be skipped because a fresher version exists in the stream. Skip if: touch exists AND touch xid is NOT visible in query snapshot.
Store or update the maximum resolved snapshot. If there's already a stored snapshot, keep the maximum of the two.
Track a touch for a non-delete change. Returns updated touch_tracker.
Types
@type move_in_name() :: String.t()
@type pg_snapshot() :: Electric.Postgres.SnapshotQuery.pg_snapshot()
@type t() :: %Electric.Shapes.Consumer.MoveIns{ filtering_move_ins: [{pg_snapshot(), keys :: [String.t()]}], in_flight_values: in_flight_values(), maximum_resolved_snapshot: nil | pg_snapshot(), minimum_unresolved_snapshot: nil | pg_snapshot(), move_in_buffering_snapshot: nil | pg_snapshot(), moved_out_tags: %{required(move_in_name()) => MapSet.t(String.t())}, touch_tracker: %{required(String.t()) => pos_integer()}, waiting_move_ins: %{ required(move_in_name()) => {pg_snapshot() | nil, {term(), MapSet.t()}} } }
Information needed to reason about move-in handling and correct stream processing.
waiting_move_ins: Information about move-ins we're waiting for. That means a move-in was triggered, butquery results are not yet available. The map value has pg snapshot and actual values that were moved in and thus should be skipped in where clause evaluation until the results are appended to the logfiltering_move_ins: Information about move-ins we're filtering. That means a move-in has resolved and wasadded to the shape log, and we need to skip changes that are already visible there.touch_tracker: A map of keys to xids of transactions that have touched them. This is used to skip changesinside move-in query results that are already visible in the shape log.move_in_buffering_snapshot: A snapshot that is a union of all the "waiting" move-in snapshots. This is used toreduce a check whether something is visible in any of the "waiting" move-in snapshots down to a single check instead of checking each snapshot individually.in_flight_values: A precalculated map of all moved-in values that caused a move-in and thus should be skipped inwhere clause evaluation until the results are appended to the log.moved_out_tags: A map of move-in names to sets of tags that were moved out while the move-in was happening and thusshould be skipped when appending move-in results to the log.maximum_resolved_snapshot: Stores the maximum snapshot of resolved move-ins that weren't immediately appended assnapshot-end control messages, to be appended when the last concurrent move-in resolves.minimum_unresolved_snapshot: Stores the minimum snapshot of unresolved move-ins.
Functions
@spec add_waiting(t(), move_in_name(), {term(), MapSet.t()}) :: t()
Add information about a new move-in to the state for which we're waiting. Snapshot is initially nil and will be set later when the query begins.
@spec change_already_visible?( t(), Electric.Postgres.Xid.anyxid(), Electric.Replication.Changes.change() ) :: boolean()
Check if a change is already visible in one of the completed move-ins.
A visible change means it needs to be skipped to avoid duplicates.
@spec change_to_filtering(t(), move_in_name(), MapSet.t(String.t())) :: {visibility_boundary :: nil | pg_snapshot(), t()}
Change a move-in from "waiting" to "filtering", marking it as complete and return best-effort visibility boundary.
Garbage collect touches that are visible in all pending snapshots. A touch is visible if its xid is before the minimum xmin of all waiting snapshots.
@spec get_and_clear_maximum_resolved_snapshot(t()) :: {pg_snapshot() | nil, t()}
Get the stored maximum resolved snapshot and clear it, or return nil if none is stored. Returns {snapshot | nil, updated_state}.
@spec is_minimum_snapshot?(t(), pg_snapshot()) :: boolean()
Check if the given snapshot is the minimum among all concurrent waiting move-ins (excluding the current one being resolved, and only considering those with known snapshots).
@spec remove_completed(t(), Electric.Replication.Changes.Transaction.t()) :: t()
Remove completed move-ins from the state.
Move-in is considered "completed" (i.e. not included in the filtering logic) once we see any transaction that is after the end of the move-in snapshot.
Filtering generally is applied only to transactions that are already visible
in the snapshot, and those can only be with xid < xmax.
@spec set_snapshot(t(), move_in_name(), pg_snapshot()) :: t()
Set the snapshot for a waiting move-in when it becomes known.
@spec should_skip_query_row?( %{required(String.t()) => pos_integer()}, pg_snapshot(), String.t() ) :: boolean()
Check if a query result row should be skipped because a fresher version exists in the stream. Skip if: touch exists AND touch xid is NOT visible in query snapshot.
@spec store_maximum_resolved_snapshot(t(), pg_snapshot()) :: t()
Store or update the maximum resolved snapshot. If there's already a stored snapshot, keep the maximum of the two.
@spec track_touch(t(), pos_integer(), Electric.Replication.Changes.change()) :: t()
Track a touch for a non-delete change. Returns updated touch_tracker.