Electric.Replication.ShapeLogCollector.FlushTracker (electric v1.2.4)

View Source

Summary

Types

shape_id()

@type shape_id() :: term()

t()

@type t() :: %Electric.Replication.ShapeLogCollector.FlushTracker{
  last_flushed: %{
    optional(shape_id()) =>
      {last_sent :: Electric.Replication.LogOffset.t(),
       last_flushed :: Electric.Replication.LogOffset.t()}
  },
  last_global_flushed_offset: Electric.Replication.LogOffset.t(),
  last_seen_offset: Electric.Replication.LogOffset.t(),
  min_incomplete_flush_tree:
    :gb_trees.tree(
      Electric.Replication.LogOffset.t_tuple(),
      MapSet.t(shape_id())
    ),
  notify_fn: (non_neg_integer() -> any())
}

Functions

empty?(flush_tracker)

handle_flush_notification(state, shape_id, last_flushed_offset)

@spec handle_flush_notification(t(), shape_id(), Electric.Replication.LogOffset.t()) ::
  t()

handle_shape_removed(state, shape_id)

handle_transaction(state, transaction, affected_shapes)

@spec handle_transaction(
  t(),
  Electric.Replication.Changes.Transaction.t(),
  Enumerable.t(shape_id())
) ::
  t()

new(opts \\ [])

Create a new flush tracker to figure out when flush boundary moves up across all writers

When doing a flush across N shapes, it might be delayed on different cadences depending on the amount of data we’re writing. It also might be worth it eventually to break lock-step writes. We need to tell Postgres accurately (enough) when we’ve actually flushed the data it sent us.

Main problem is that we’re not only flushing on different cadences, but also each shape might not see every operation, so our flush acknowledgement should take a complicated minimum across all shapes depending on what they are seeing. What’s more is that we want to align the acknowledged WALs to transaction boundaries, because that’s how PG is sending the data.

It’s important to note that because shapes are not seeing all operations, they don’t necessarily see the last-in-transaction operation, while the sender doesn’t know how many operations will be sent upfront. Because of that it’s up to the writer to acknowledge the intermediate flushes but also to align the last-seen operation to the transaction offset so that the sender can be sure the writer has caught up.

Tracked state:

  • last_global_flushed_offset
  • last_seen_offset
  • Pending writes Mapping:
    Shape => {last_sent, last_flushed}
    • Shapes where last_sent == last_flushed can be considered caught-up, and can be discarded from the mapping

Algorithm:

On incoming transaction: expressed via handle_transaction/3

  1. Update last_seen_offset to the max offset of the transaction/block we received
  2. Determine affected shapes
  3. For each shape,
  4. If Mapping already has the shape, update last_sent to the max offset of the transaction
  5. If Mapping doesn’t have the shape, add it with {last_sent, prev_log_offset} where prev_log_offset is an artificial offset with its tx_offset set to one less than the incoming transaction. This is a safe upper bound to use, as the shape must have flushed all relevant data before this transaction, and thus even if the previous transaction did not affect this shape we can consider it "flushed" by the shape.
  6. If Mapping is empty after this update, then we’re up-to-date and should consider this transaction immediately flushed. Set last_global_flushed_offset to equal last_seen_offset and notify appropriately. See step 2 of writer flush process.
  7. Wait for the writers to send the flushed offset

On writer flush (i.e. when writer notifies the central process of a flushed write) notifying with newlast_flushed expressed via handle_flush_notification/3

  1. Update the mapping for the shape:
  2. If last_sent equals to the new flush position, then we’re caught up. Delete this shape from the mapping
  3. Otherwise, replace last_flushed with this new value
  4. If Mapping is empty after the update, we’re globally caught up - set last_global_flushed_offset to equal last_seen_offset
  5. Otherwise:
  6. Determine the new global flushed offset: last_global_flushed_offset = max(last_global_flushed_offset, min(for {_, {_, last_flushed}} <- Mapping, do: last_flushed)) We take the maximum between the already last flushed offset, and the lowest flushed offset across shapes that had not caught up. Because this min is expected to be called very often, we use a lookup structure to get this min in a fast manner
  7. On last_global_flushed_offset update - notify the replication client with actual transaction LSN:
  8. If flushes are caught up (i.e. Mapping is empty), then notify with LSN = tx_offset of the last flushed offset
  9. Otherwise, it’s complicated to determine which transactions have been flushed completely without keeping track of all intermediate points, so notify with LSN = tx_offset - 1, essentially lagging the flush by one transaction just in case.

Aligning the writers flushed offset with the transaction boundary:

  1. On incoming transaction, store a mapping of last offset that’s meant to be written by this writer to the last offset for the txn
  2. On a flush, the writer should remove from the mapping all elements that are less-then-or-equal to last flushed offset, and then
  3. If last removed element from the mapping is equal to the flushed, then use the transaction last offset instead to notify the sender
  4. Otherwise, use actual last flushed offset to notify the sender.