View Source Electric.ShapeCache.Storage behaviour (electric v0.9.5)
Summary
Callbacks
Append log items from one transaction to the log.
Clean up snapshots/logs for a shape handle
Initialise shape-specific opts from the shared, global, configuration
Retrieve all stored shapes
Get the last exclusive offset of the chunk starting from the given offset.
Get the current xmin and offset for the shape storage.
Get stream of the log for a shape since a given offset
Get the full snapshot for a given shape, also returning the offset this snapshot includes
Get the total disk usage for all shapes
Run any initial setup tasks
Make a new snapshot for a shape handle based on the meta information about the table and a stream of plain string rows
Store the shape definition
Validate and initialise storage base configuration from application configuration
Check if snapshot for a given shape handle already exists
Start any processes required to run the storage backend
Clean up snapshots/logs for a shape handle by deleting whole directory.
Types
@type compiled_opts() :: term()
@type log() :: Enumerable.t(Electric.Shapes.Querying.json_iodata())
@type log_item() :: {Electric.Replication.LogOffset.t(), Electric.Shapes.Querying.json_iodata()} | {:chunk_boundary | Electric.Replication.LogOffset.t()}
@type offset() :: Electric.Replication.LogOffset.t()
@type row() :: list()
@type shape_handle() :: Electric.ShapeCacheBehaviour.shape_handle()
@type shape_opts() :: term()
@type shape_storage() :: {module(), shape_opts()}
@type storage() :: {module(), compiled_opts()}
@type xmin() :: Electric.ShapeCacheBehaviour.xmin()
Callbacks
@callback append_to_log!(Enumerable.t(log_item()), shape_opts()) :: :ok | no_return()
Append log items from one transaction to the log.
Each storage implementation is responsible for handling transient errors using some retry strategy.
If the backend fails to write within the expected time, or some other error occurs, then this should raise.
@callback cleanup!(shape_opts()) :: :ok
Clean up snapshots/logs for a shape handle
@callback for_shape(shape_handle(), compiled_opts()) :: shape_opts()
Initialise shape-specific opts from the shared, global, configuration
@callback get_all_stored_shapes(compiled_opts()) :: {:ok, %{required(shape_handle()) => Electric.Shapes.Shape.t()}} | {:error, term()}
Retrieve all stored shapes
@callback get_chunk_end_log_offset(Electric.Replication.LogOffset.t(), shape_opts()) :: Electric.Replication.LogOffset.t() | nil
Get the last exclusive offset of the chunk starting from the given offset.
If chunk has not finished accumulating, nil
is returned.
If chunk has finished accumulating, the last offset of the chunk is returned.
@callback get_current_position(shape_opts()) :: {:ok, offset(), xmin() | nil} | {:error, term()}
Get the current xmin and offset for the shape storage.
If the instance is new, then it MUST return {LogOffset.first(), nil}
.
@callback get_log_stream( offset :: Electric.Replication.LogOffset.t(), max_offset :: Electric.Replication.LogOffset.t(), shape_opts() ) :: log()
Get stream of the log for a shape since a given offset
@callback get_snapshot(shape_opts()) :: {offset :: Electric.Replication.LogOffset.t(), log()}
Get the full snapshot for a given shape, also returning the offset this snapshot includes
@callback get_total_disk_usage(compiled_opts()) :: non_neg_integer()
Get the total disk usage for all shapes
@callback initialise(shape_opts()) :: :ok
Run any initial setup tasks
@callback make_new_snapshot!( Electric.Shapes.Querying.json_result_stream(), shape_opts() ) :: :ok
Make a new snapshot for a shape handle based on the meta information about the table and a stream of plain string rows
Should raise an error if making the snapshot had failed for any reason.
@callback mark_snapshot_as_started(shape_opts()) :: :ok
@callback set_shape_definition(Electric.Shapes.Shape.t(), shape_opts()) :: :ok
Store the shape definition
@callback set_snapshot_xmin(xmin(), shape_opts()) :: :ok
@callback snapshot_started?(shape_opts()) :: boolean()
Check if snapshot for a given shape handle already exists
@callback start_link(shape_opts()) :: GenServer.on_start()
Start any processes required to run the storage backend
@callback unsafe_cleanup!(shape_opts()) :: :ok
Clean up snapshots/logs for a shape handle by deleting whole directory.
Does not require any extra storage processes to be running, but should only be used if the shape is known to not be in use to avoid concurrency issues.
Functions
@spec child_spec(shape_storage()) :: Supervisor.child_spec()