View Source Electric.ShapeCache.Storage behaviour (electric v0.9.5)

Summary

Callbacks

Append log items from one transaction to the log.

Clean up snapshots/logs for a shape handle

Initialise shape-specific opts from the shared, global, configuration

Retrieve all stored shapes

Get the last exclusive offset of the chunk starting from the given offset.

Get the current xmin and offset for the shape storage.

Get stream of the log for a shape since a given offset

Get the full snapshot for a given shape, also returning the offset this snapshot includes

Get the total disk usage for all shapes

Run any initial setup tasks

Make a new snapshot for a shape handle based on the meta information about the table and a stream of plain string rows

Store the shape definition

Validate and initialise storage base configuration from application configuration

Check if snapshot for a given shape handle already exists

Start any processes required to run the storage backend

Clean up snapshots/logs for a shape handle by deleting whole directory.

Types

Callbacks

Link to this callback

append_to_log!(t, shape_opts)

View Source
@callback append_to_log!(Enumerable.t(log_item()), shape_opts()) :: :ok | no_return()

Append log items from one transaction to the log.

Each storage implementation is responsible for handling transient errors using some retry strategy.

If the backend fails to write within the expected time, or some other error occurs, then this should raise.

@callback cleanup!(shape_opts()) :: :ok

Clean up snapshots/logs for a shape handle

Link to this callback

for_shape(shape_handle, compiled_opts)

View Source
@callback for_shape(shape_handle(), compiled_opts()) :: shape_opts()

Initialise shape-specific opts from the shared, global, configuration

Link to this callback

get_all_stored_shapes(compiled_opts)

View Source
@callback get_all_stored_shapes(compiled_opts()) ::
  {:ok, %{required(shape_handle()) => Electric.Shapes.Shape.t()}}
  | {:error, term()}

Retrieve all stored shapes

Link to this callback

get_chunk_end_log_offset(t, shape_opts)

View Source
@callback get_chunk_end_log_offset(Electric.Replication.LogOffset.t(), shape_opts()) ::
  Electric.Replication.LogOffset.t() | nil

Get the last exclusive offset of the chunk starting from the given offset.

If chunk has not finished accumulating, nil is returned.

If chunk has finished accumulating, the last offset of the chunk is returned.

Link to this callback

get_current_position(shape_opts)

View Source
@callback get_current_position(shape_opts()) ::
  {:ok, offset(), xmin() | nil} | {:error, term()}

Get the current xmin and offset for the shape storage.

If the instance is new, then it MUST return {LogOffset.first(), nil}.

Link to this callback

get_log_stream(offset, max_offset, shape_opts)

View Source
@callback get_log_stream(
  offset :: Electric.Replication.LogOffset.t(),
  max_offset :: Electric.Replication.LogOffset.t(),
  shape_opts()
) :: log()

Get stream of the log for a shape since a given offset

Link to this callback

get_snapshot(shape_opts)

View Source
@callback get_snapshot(shape_opts()) ::
  {offset :: Electric.Replication.LogOffset.t(), log()}

Get the full snapshot for a given shape, also returning the offset this snapshot includes

Link to this callback

get_total_disk_usage(compiled_opts)

View Source
@callback get_total_disk_usage(compiled_opts()) :: non_neg_integer()

Get the total disk usage for all shapes

@callback initialise(shape_opts()) :: :ok

Run any initial setup tasks

Link to this callback

make_new_snapshot!(json_result_stream, shape_opts)

View Source
@callback make_new_snapshot!(
  Electric.Shapes.Querying.json_result_stream(),
  shape_opts()
) :: :ok

Make a new snapshot for a shape handle based on the meta information about the table and a stream of plain string rows

Should raise an error if making the snapshot had failed for any reason.

Link to this callback

mark_snapshot_as_started(shape_opts)

View Source
@callback mark_snapshot_as_started(shape_opts()) :: :ok
Link to this callback

set_shape_definition(t, shape_opts)

View Source
@callback set_shape_definition(Electric.Shapes.Shape.t(), shape_opts()) :: :ok

Store the shape definition

Link to this callback

set_snapshot_xmin(xmin, shape_opts)

View Source
@callback set_snapshot_xmin(xmin(), shape_opts()) :: :ok
@callback shared_opts(term()) :: compiled_opts()

Validate and initialise storage base configuration from application configuration

Link to this callback

snapshot_started?(shape_opts)

View Source
@callback snapshot_started?(shape_opts()) :: boolean()

Check if snapshot for a given shape handle already exists

@callback start_link(shape_opts()) :: GenServer.on_start()

Start any processes required to run the storage backend

Link to this callback

unsafe_cleanup!(shape_opts)

View Source
@callback unsafe_cleanup!(shape_opts()) :: :ok

Clean up snapshots/logs for a shape handle by deleting whole directory.

Does not require any extra storage processes to be running, but should only be used if the shape is known to not be in use to avoid concurrency issues.

Functions