View Source Electric.ShapeCache.Storage behaviour (electric v1.0.1)

Summary

Callbacks

Append log items from one transaction to the log.

Clean up snapshots/logs for a shape handle

Initialise shape-specific opts from the shared, global, configuration

Retrieve all stored shapes

Get the last exclusive offset of the chunk starting from the given offset.

Get the current pg_snapshot and offset for the shape storage.

Get stream of the log for a shape since a given offset

Get the total disk usage for all shapes

Run any initial setup tasks

Make a new snapshot for a shape handle based on the meta information about the table and a stream of plain string rows

Store the shape definition

Validate and initialise storage base configuration from application configuration

Check if snapshot for a given shape handle already exists

Start any processes required to run the storage backend

Clean up snapshots/logs for a shape handle by deleting whole directory.

Types

@type compiled_opts() :: term()
@type log_item() ::
  {Electric.Replication.LogOffset.t(), key :: String.t(),
   operation_type :: operation_type(), Electric.Shapes.Querying.json_iodata()}
  | {:chunk_boundary | Electric.Replication.LogOffset.t()}
@type offset() :: Electric.Replication.LogOffset.t()
@type operation_type() :: :insert | :update | :delete
@type pg_snapshot() :: %{
  xmin: pos_integer(),
  xmax: pos_integer(),
  xip_list: [pos_integer()],
  filter_txns?: boolean()
}
@type row() :: list()
@type shape_handle() :: Electric.ShapeCacheBehaviour.shape_handle()
@type shape_opts() :: term()
@type shape_storage() :: {module(), shape_opts()}
@type storage() :: {module(), compiled_opts()}
@type xmin() :: Electric.ShapeCacheBehaviour.xmin()

Callbacks

Link to this callback

append_to_log!(t, shape_opts)

View Source
@callback append_to_log!(Enumerable.t(log_item()), shape_opts()) :: :ok | no_return()

Append log items from one transaction to the log.

Each storage implementation is responsible for handling transient errors using some retry strategy.

If the backend fails to write within the expected time, or some other error occurs, then this should raise.

@callback cleanup!(shape_opts()) :: :ok

Clean up snapshots/logs for a shape handle

Link to this callback

for_shape(shape_handle, compiled_opts)

View Source
@callback for_shape(shape_handle(), compiled_opts()) :: shape_opts()

Initialise shape-specific opts from the shared, global, configuration

Link to this callback

get_all_stored_shapes(compiled_opts)

View Source
@callback get_all_stored_shapes(compiled_opts()) ::
  {:ok, %{required(shape_handle()) => Electric.Shapes.Shape.t()}}
  | {:error, term()}

Retrieve all stored shapes

Link to this callback

get_chunk_end_log_offset(t, shape_opts)

View Source
@callback get_chunk_end_log_offset(Electric.Replication.LogOffset.t(), shape_opts()) ::
  Electric.Replication.LogOffset.t() | nil

Get the last exclusive offset of the chunk starting from the given offset.

If chunk has not finished accumulating, nil is returned.

If chunk has finished accumulating, the last offset of the chunk is returned.

Link to this callback

get_current_position(shape_opts)

View Source
@callback get_current_position(shape_opts()) ::
  {:ok, offset(), pg_snapshot() | nil} | {:error, term()}

Get the current pg_snapshot and offset for the shape storage.

If the instance is new, then it MUST return {LogOffset.first(), nil}.

Link to this callback

get_log_stream(offset, max_offset, shape_opts)

View Source
@callback get_log_stream(
  offset :: Electric.Replication.LogOffset.t(),
  max_offset :: Electric.Replication.LogOffset.t(),
  shape_opts()
) :: log()

Get stream of the log for a shape since a given offset

Link to this callback

get_total_disk_usage(compiled_opts)

View Source
@callback get_total_disk_usage(compiled_opts()) :: non_neg_integer()

Get the total disk usage for all shapes

@callback initialise(shape_opts()) :: :ok

Run any initial setup tasks

Link to this callback

make_new_snapshot!(json_result_stream, shape_opts)

View Source
@callback make_new_snapshot!(
  Electric.Shapes.Querying.json_result_stream(),
  shape_opts()
) :: :ok

Make a new snapshot for a shape handle based on the meta information about the table and a stream of plain string rows

Should raise an error if making the snapshot had failed for any reason.

Link to this callback

mark_snapshot_as_started(shape_opts)

View Source
@callback mark_snapshot_as_started(shape_opts()) :: :ok
Link to this callback

set_pg_snapshot(pg_snapshot, shape_opts)

View Source
@callback set_pg_snapshot(pg_snapshot(), shape_opts()) :: :ok
Link to this callback

set_shape_definition(t, shape_opts)

View Source
@callback set_shape_definition(Electric.Shapes.Shape.t(), shape_opts()) :: :ok

Store the shape definition

@callback shared_opts(term()) :: compiled_opts()

Validate and initialise storage base configuration from application configuration

Link to this callback

snapshot_started?(shape_opts)

View Source
@callback snapshot_started?(shape_opts()) :: boolean()

Check if snapshot for a given shape handle already exists

@callback start_link(shape_opts()) :: GenServer.on_start()

Start any processes required to run the storage backend

Link to this callback

unsafe_cleanup!(shape_opts)

View Source
@callback unsafe_cleanup!(shape_opts()) :: :ok

Clean up snapshots/logs for a shape handle by deleting whole directory.

Does not require any extra storage processes to be running, but should only be used if the shape is known to not be in use to avoid concurrency issues.

Functions