Electric.Replication.ShapeLogCollector.FlushTracker (electric v1.2.4)
View SourceSummary
Functions
Create a new flush tracker to figure out when flush boundary moves up across all writers
Types
@type shape_id() :: term()
@type t() :: %Electric.Replication.ShapeLogCollector.FlushTracker{ last_flushed: %{ optional(shape_id()) => {last_sent :: Electric.Replication.LogOffset.t(), last_flushed :: Electric.Replication.LogOffset.t()} }, last_global_flushed_offset: Electric.Replication.LogOffset.t(), last_seen_offset: Electric.Replication.LogOffset.t(), min_incomplete_flush_tree: :gb_trees.tree( Electric.Replication.LogOffset.t_tuple(), MapSet.t(shape_id()) ), notify_fn: (non_neg_integer() -> any()) }
Functions
@spec handle_flush_notification(t(), shape_id(), Electric.Replication.LogOffset.t()) :: t()
@spec handle_transaction( t(), Electric.Replication.Changes.Transaction.t(), Enumerable.t(shape_id()) ) :: t()
Create a new flush tracker to figure out when flush boundary moves up across all writers
When doing a flush across N shapes, it might be delayed on different cadences depending on the amount of data we’re writing. It also might be worth it eventually to break lock-step writes. We need to tell Postgres accurately (enough) when we’ve actually flushed the data it sent us.
Main problem is that we’re not only flushing on different cadences, but also each shape might not see every operation, so our flush acknowledgement should take a complicated minimum across all shapes depending on what they are seeing. What’s more is that we want to align the acknowledged WALs to transaction boundaries, because that’s how PG is sending the data.
It’s important to note that because shapes are not seeing all operations, they don’t necessarily see the last-in-transaction operation, while the sender doesn’t know how many operations will be sent upfront. Because of that it’s up to the writer to acknowledge the intermediate flushes but also to align the last-seen operation to the transaction offset so that the sender can be sure the writer has caught up.
Tracked state:
last_global_flushed_offsetlast_seen_offset- Pending writes Mapping:
Shape => {last_sent, last_flushed}- Shapes where
last_sent == last_flushedcan be considered caught-up, and can be discarded from the mapping
- Shapes where
Algorithm:
On incoming transaction: expressed via handle_transaction/3
- Update
last_seen_offsetto the max offset of the transaction/block we received - Determine affected shapes
- For each shape,
- If Mapping already has the shape, update
last_sentto the max offset of the transaction - If Mapping doesn’t have the shape, add it with
{last_sent, prev_log_offset}whereprev_log_offsetis an artificial offset with itstx_offsetset to one less than the incoming transaction. This is a safe upper bound to use, as the shape must have flushed all relevant data before this transaction, and thus even if the previous transaction did not affect this shape we can consider it "flushed" by the shape. - If Mapping is empty after this update, then we’re up-to-date and should consider this transaction immediately flushed.
Set
last_global_flushed_offsetto equallast_seen_offsetand notify appropriately. See step 2 of writer flush process. - Wait for the writers to send the flushed offset
On writer flush (i.e. when writer notifies the central process of a flushed write) notifying with newlast_flushed expressed via handle_flush_notification/3
- Update the mapping for the shape:
- If
last_sentequals to the new flush position, then we’re caught up. Delete this shape from the mapping - Otherwise, replace
last_flushedwith this new value - If Mapping is empty after the update, we’re globally caught up - set
last_global_flushed_offsetto equallast_seen_offset - Otherwise:
- Determine the new global flushed offset:
last_global_flushed_offset = max(last_global_flushed_offset, min(for {_, {_, last_flushed}} <- Mapping, do: last_flushed))We take the maximum between the already last flushed offset, and the lowest flushed offset across shapes that had not caught up. Because thisminis expected to be called very often, we use a lookup structure to get thisminin a fast manner - On last_global_flushed_offset update - notify the replication client with actual transaction LSN:
- If flushes are caught up (i.e. Mapping is empty), then notify with LSN = tx_offset of the last flushed offset
- Otherwise, it’s complicated to determine which transactions have been flushed completely without keeping track of all intermediate points, so notify with LSN = tx_offset - 1, essentially lagging the flush by one transaction just in case.
Aligning the writers flushed offset with the transaction boundary:
- On incoming transaction, store a mapping of last offset that’s meant to be written by this writer to the last offset for the txn
- On a flush, the writer should remove from the mapping all elements that are less-then-or-equal to last flushed offset, and then
- If last removed element from the mapping is equal to the flushed, then use the transaction last offset instead to notify the sender
- Otherwise, use actual last flushed offset to notify the sender.