# `Electric.Replication.ShapeLogCollector.FlushTracker`
[🔗](https://github.com/electric-sql/electric/tree/%40core/sync-service%401.6.2/packages/sync-service/lib/electric/replication/shape_log_collector/flush_tracker.ex#L1)

# `shape_id`

```elixir
@type shape_id() :: term()
```

# `t`

```elixir
@type t() :: %Electric.Replication.ShapeLogCollector.FlushTracker{
  last_flushed: %{
    optional(shape_id()) =&gt;
      {last_sent :: Electric.Replication.LogOffset.t(),
       last_flushed :: Electric.Replication.LogOffset.t()}
  },
  last_global_flushed_offset: Electric.Replication.LogOffset.t(),
  last_seen_offset: Electric.Replication.LogOffset.t(),
  min_incomplete_flush_tree:
    :gb_trees.tree(
      Electric.Replication.LogOffset.t_tuple(),
      MapSet.t(shape_id())
    ),
  notify_fn: (non_neg_integer() -&gt; any())
}
```

# `empty?`

# `handle_flush_notification`

```elixir
@spec handle_flush_notification(t(), shape_id(), Electric.Replication.LogOffset.t()) ::
  t()
```

# `handle_shape_removed`

# `handle_txn_fragment`

```elixir
@spec handle_txn_fragment(
  t(),
  Electric.Replication.Changes.TransactionFragment.t(),
  Enumerable.t(shape_id())
) :: t()
```

# `new`

Create a new flush tracker to figure out when flush boundary moves up across all writers

When doing a flush across N shapes, it might be delayed on different cadences depending on the amount of data we're writing.
It also might be worth it eventually to break lock-step writes. We need to tell Postgres accurately (enough) when we've actually flushed the data it sent us.

Main problem is that we're not only flushing on different cadences, but also each shape might not see every operation, so our flush acknowledgement
should take a complicated minimum across all shapes depending on what they are seeing. What's more is that we want to align the acknowledged WALs
to transaction boundaries, because that's how PG is sending the data.

It's important to note that because shapes are not seeing all operations, they don't necessarily see the last-in-transaction operation, while the
sender doesn't know how many operations will be sent upfront. Because of that it's up to the writer to acknowledge the intermediate flushes but also
to align the last-seen operation to the transaction offset so that the sender can be sure the writer has caught up.

### Tracked state:
- `last_global_flushed_offset`
- `last_seen_offset`
- Pending writes Mapping:
  ```
  Shape => {last_sent, last_flushed}
  ```
  - Shapes where `last_sent == last_flushed` can be considered caught-up, and can be discarded from the mapping

### Algorithm:

On incoming transaction: expressed via `handle_transaction/3`
1. Update `last_seen_offset` to the max offset of the transaction/block we received
2. Determine affected shapes
3. For each shape,
  1. If Mapping already has the shape, update `last_sent` to the max offset of the transaction
  2. If Mapping doesn't have the shape, add it with `{last_sent, prev_log_offset}` where `prev_log_offset` is an
     artificial offset with its `tx_offset` set to one less than the incoming transaction. This is a safe upper bound
     to use, as the shape must have flushed all relevant data before this transaction, and thus even if the previous
     transaction did not affect this shape we can consider it "flushed" by the shape.
4. If Mapping is empty after this update, then we're up-to-date and should consider this transaction immediately flushed.
   Set `last_global_flushed_offset` to equal `last_seen_offset` and notify appropriately. See step 2 of writer flush process.
5. Wait for the writers to send the flushed offset

On writer flush (i.e. when writer notifies the central process of a flushed write) notifying with `newlast_flushed` expressed via `handle_flush_notification/3`
1. Update the mapping for the shape:
  1. If `last_sent` equals to the new flush position, then we're caught up. Delete this shape from the mapping
  2. Otherwise, replace `last_flushed` with this new value
2. If Mapping is empty after the update, we're globally caught up - set `last_global_flushed_offset` to equal `last_seen_offset`
3. Otherwise:
  1. Determine the new global flushed offset:
     `last_global_flushed_offset = max(last_global_flushed_offset, min(for {_, {_, last_flushed}} <- Mapping, do: last_flushed))`
     We take the maximum between the already last flushed offset, and the lowest flushed offset across shapes that
     had not caught up. Because this `min` is expected to be called very often, we use a lookup structure to get this `min` in a fast manner
4. On last_global_flushed_offset update - notify the replication client with actual transaction LSN:
  1. If flushes are caught up (i.e. Mapping is empty), then notify with LSN = tx_offset of the last flushed offset
  2. Otherwise, it's complicated to determine which transactions have been flushed completely without keeping track of
     all intermediate points, so notify with LSN = tx_offset - 1, essentially lagging the flush by one transaction just in case.

Aligning the writers flushed offset with the transaction boundary:
1. On incoming transaction, store a mapping of last offset that's meant to be written by this writer to the last offset for the txn
2. On a flush, the writer should remove from the mapping all elements that are less-then-or-equal to last flushed offset, and then
  1. If last removed element from the mapping is equal to the flushed, then use the transaction last offset instead to notify the sender
  2. Otherwise, use actual last flushed offset to notify the sender.

---

*Consult [api-reference.md](api-reference.md) for complete listing*
