View Source Electric.ShapeCache.Storage behaviour (electric v0.7.6)
Summary
Callbacks
Append log items from one transaction to the log.
Clean up snapshots/logs for a shape id
Initialise shape-specific opts from the shared, global, configuration
Retrieve all stored shapes
Get the last exclusive offset of the chunk starting from the given offset.
Get the current xmin and offset for the shape storage.
Get stream of the log for a shape since a given offset
Get the full snapshot for a given shape, also returning the offset this snapshot includes
Run any initial setup tasks
Make a new snapshot for a shape ID based on the meta information about the table and a stream of plain string rows
Store the shape definition
Validate and initialise storage base configuration from application configuration
Check if snapshot for a given shape id already exists
Start any processes required to run the storage backend
Types
@type compiled_opts() :: term()
@type log() :: Enumerable.t(Electric.Shapes.Querying.json_iodata())
@type log_item() :: {Electric.Replication.LogOffset.t(), Electric.Shapes.Querying.json_iodata()} | {:chunk_boundary | Electric.Replication.LogOffset.t()}
@type log_state() :: %{current_chunk_byte_size: non_neg_integer()}
@type offset() :: Electric.Replication.LogOffset.t()
@type row() :: list()
@type shape_id() :: Electric.ShapeCacheBehaviour.shape_id()
@type shape_opts() :: term()
@type shape_storage() :: {module(), shape_opts()}
@type storage() :: {module(), compiled_opts()}
@type xmin() :: Electric.ShapeCacheBehaviour.xmin()
Callbacks
@callback append_to_log!(Enumerable.t(log_item()), shape_opts()) :: :ok | no_return()
Append log items from one transaction to the log.
Each storage implementation is responsible for handling transient errors using some retry strategy.
If the backend fails to write within the expected time, or some other error occurs, then this should raise.
@callback cleanup!(shape_opts()) :: :ok
Clean up snapshots/logs for a shape id
@callback for_shape(shape_id(), compiled_opts()) :: shape_opts()
Initialise shape-specific opts from the shared, global, configuration
@callback get_all_stored_shapes(compiled_opts()) :: {:ok, %{required(shape_id()) => Electric.Shapes.Shape.t()}} | {:error, term()}
Retrieve all stored shapes
@callback get_chunk_end_log_offset(Electric.Replication.LogOffset.t(), shape_opts()) :: Electric.Replication.LogOffset.t() | nil
Get the last exclusive offset of the chunk starting from the given offset.
If chunk has not finished accumulating, nil
is returned.
If chunk has finished accumulating, the last offset of the chunk is returned.
@callback get_current_position(shape_opts()) :: {:ok, offset(), xmin() | nil} | {:error, term()}
Get the current xmin and offset for the shape storage.
If the instance is new, then it MUST return {LogOffset.first(), nil}
.
@callback get_log_stream( offset :: Electric.Replication.LogOffset.t(), max_offset :: Electric.Replication.LogOffset.t(), shape_opts() ) :: log()
Get stream of the log for a shape since a given offset
@callback get_snapshot(shape_opts()) :: {offset :: Electric.Replication.LogOffset.t(), log()}
Get the full snapshot for a given shape, also returning the offset this snapshot includes
@callback initialise(shape_opts()) :: :ok
Run any initial setup tasks
@callback make_new_snapshot!( Electric.Shapes.Querying.json_result_stream(), shape_opts() ) :: :ok
Make a new snapshot for a shape ID based on the meta information about the table and a stream of plain string rows
Should raise an error if making the snapshot had failed for any reason.
@callback mark_snapshot_as_started(shape_opts()) :: :ok
@callback set_shape_definition(Electric.Shapes.Shape.t(), shape_opts()) :: :ok
Store the shape definition
@callback set_snapshot_xmin(xmin(), shape_opts()) :: :ok
@callback snapshot_started?(shape_opts()) :: boolean()
Check if snapshot for a given shape id already exists
@callback start_link(shape_opts()) :: GenServer.on_start()
Start any processes required to run the storage backend
Functions
@spec child_spec(shape_storage()) :: Supervisor.child_spec()