Fly.Postgres.LSN.Tracker (Fly Postgres v0.3.4) View Source

Tracks the current PostgreSQL LSN or Log Sequence Number. This also tracks requests to be notified when replication happens and the requested :insert LSN was applied locally.

GenServers and process responsibility

The GenServer process doesn't have any special behaviors other than creating and owning the ETS tables that track the information.

The module contains functions for writing data to, and reading data from the ETS tables.

The Fly.Postgres.LSN.Reader GenServer is responsible for interacting with the database. When replication events are seen, it uses the Tracker functions to write the cache and executes the notification code. However, the Reader server is the process executing that code. It is designed this way on purpose so that any crashes or failures happen to the Reader and caches with notification requests and current replication values are not lost.

LSN values

Tracking the LSN value is used to determine which portions of the database log have been replicated locally. This lets us determine if a specific transaction chunk has been replicated to know that some expected data is present.

The client process doesn't interact directly with the Tracker GenServer. The client can request_notification or request_and_await_notification and the requesting processes are notified when the data replication has been seen.

Link to this section Summary

Functions

Blocking function that waits for a request_notification/2 response message to be received. The timeout defaults to 5s after which time it stops waiting and returns an {:error, :timeout} response.

Returns a specification to start this module under a supervisor.

Get the ETS table name. It is derived from the table prefix name and the base name of the tracker (as there can be multiple).

Get the latest cached LSN replay value. On a first run, no value is in the cache and a nil is returned.

Get the LSN cached ETS table name for the specified tracker.

Get the name of the tracker instance that is derived from the base tracking name.

Get the Ecto.Repo used by the tracker.

Get the notification request tracking ETS table name for the specified tracker.

Callback implementation for GenServer.init/1.

Return if the LSN value was replicated. Compares against the cached value.

Request to be notified when the desired level of data replication has completed and wait for it to complete. Optionally it may timeout if it takes too long.

Request notification for when the database replication includes the LSN the process cares about. This enables a process to block and await their data to be replicated and be notified as soon as it's detected.

Start the Tracker that receives work requests.

Write the latest LSN value to the cache. Don't record a nil LSN value.

Link to this section Functions

Link to this function

await_notification(lsn, opts \\ [])

View Source

Specs

await_notification(Fly.Postgres.LSN.t(), opts :: keyword()) ::
  :ready | {:error, :timeout}

Blocking function that waits for a request_notification/2 response message to be received. The timeout defaults to 5s after which time it stops waiting and returns an {:error, :timeout} response.

Options

  • :replication_timeout - Timeout duration to wait for replication to complete. Value is in milliseconds.

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

get_ets_table_name(base_table_name, opts \\ [])

View Source

Specs

get_ets_table_name(atom(), opts :: keyword()) :: atom()

Get the ETS table name. It is derived from the table prefix name and the base name of the tracker (as there can be multiple).

Link to this function

get_last_replay(opts \\ [])

View Source

Specs

get_last_replay(opts :: keyword()) :: nil | Fly.Postgres.LSN.t()

Get the latest cached LSN replay value. On a first run, no value is in the cache and a nil is returned.

Options

  • :tracker - The tracker name to get the latest LSN replay value for. Uses the default tracker name. Required when using multiple trackers.
Link to this function

get_lsn_cache_table(opts \\ [])

View Source

Specs

get_lsn_cache_table(opts :: keyword()) :: atom()

Get the LSN cached ETS table name for the specified tracker.

Specs

get_name(atom()) :: atom()

Get the name of the tracker instance that is derived from the base tracking name.

Specs

get_repo(opts :: keyword()) :: nil | module()

Get the Ecto.Repo used by the tracker.

Options

  • :tracker - The tracker name to get the latest LSN replay value for. Uses the default tracker name. Needs to be provided when multiple trackers are used.
Link to this function

get_request_tracking_table(opts \\ [])

View Source

Specs

get_request_tracking_table(opts :: keyword()) :: atom()

Get the notification request tracking ETS table name for the specified tracker.

Callback implementation for GenServer.init/1.

Link to this function

replicated?(lsn, opts \\ [])

View Source

Specs

replicated?(Fly.Postgres.LSN.t(), opts :: keyword()) :: boolean()

Return if the LSN value was replicated. Compares against the cached value.

Link to this function

request_and_await_notification(error_or_lsn, opts \\ [])

View Source

Specs

request_and_await_notification(
  :wal_lookup_failure | Fly.Postgres.LSN.t(),
  opts :: keyword()
) :: :ready | {:error, :timeout}

Request to be notified when the desired level of data replication has completed and wait for it to complete. Optionally it may timeout if it takes too long.

Options

  • :tracker - The name of the tracker to wait on for replication tracking.
  • :replication_timeout - Timeout duration to wait for replication to complete. Value is in milliseconds.
Link to this function

request_notification(lsn, opts \\ [])

View Source

Specs

request_notification(Fly.Postgres.LSN.t(), opts :: keyword()) :: :ok

Request notification for when the database replication includes the LSN the process cares about. This enables a process to block and await their data to be replicated and be notified as soon as it's detected.

Adds an entry to ETS table that tracks notification requests.

Start the Tracker that receives work requests.

Link to this function

write_lsn_to_cache(lsn, lsn_table)

View Source

Specs

write_lsn_to_cache(nil | Fly.Postgres.LSN.t(), lsn_table :: atom()) :: :ok

Write the latest LSN value to the cache. Don't record a nil LSN value.