View Source Runbox.StateStore.Storage behaviour (runbox v21.2.0)

Behaviour for generic state store storage.

Behaviour defines callbacks to handle savepoint persistence.

Summary

Callbacks

Deletes old savepoints of given run but keeps n latest savepoints.

Deletes storage of run identified by run_id.

Returns latest persisted savepoint of given run.

Returns latest persisted savepoint timestamp of given run.

Initializes state store for given run.

Predicate returning true when state store of given run has been initialized.

Persists run state (savepoint) of given run.

Functions

Deletes old savepoints of given run but keeps n latest savepoints.

Deletes storage of run identified by run_id.

Returns latest persisted savepoint of given run.

Initialize state store config.

Initializes state store for given run.

Predicate returning true when state store of given run has been initialized.

Returns latest persisted savepoint timestamp of given run.

Persists run state (savepoint) of given run.

Types

@type config() :: {module(), term()}
@type run_id() :: String.t()

Callbacks

Link to this callback

handle_delete_old_savepoints(storage_state, run_id, keep_n)

View Source
@callback handle_delete_old_savepoints(
  storage_state :: term(),
  run_id(),
  keep_n :: integer()
) :: :ok

Deletes old savepoints of given run but keeps n latest savepoints.

Arguments

  • storage_state - behaviour implementation state
  • run_id
  • keep_n - count of persisted savepoints which should remain undeleted
Link to this callback

handle_delete_run(storage_state, run_id)

View Source
@callback handle_delete_run(storage_state :: term(), run_id :: run_id()) ::
  :ok | {:error, reason :: term()}

Deletes storage of run identified by run_id.

Arguments

  • storage_state - behaviour implementation state
  • run_id
Link to this callback

handle_get_latest_savepoint(storage_state, run_id)

View Source
@callback handle_get_latest_savepoint(storage_state :: term(), run_id()) ::
  {:ok, Runbox.StateStore.ScheduleUtils.epoch_ms(),
   [{Runbox.StateStore.Entity.id(), Runbox.StateStore.Entity.state()}]}
  | {:error, term()}

Returns latest persisted savepoint of given run.

Arguments

  • storage_state - behaviour implementation state
  • run_id
Link to this callback

handle_get_latest_savepoint_timestamp(storage_state, run_id)

View Source
@callback handle_get_latest_savepoint_timestamp(storage_state :: term(), run_id()) ::
  {:ok, Runbox.StateStore.ScheduleUtils.epoch_ms()} | {:error, reason :: term()}

Returns latest persisted savepoint timestamp of given run.

Arguments

  • storage_state - behaviour implementation state
  • run_id
Link to this callback

handle_init_storage(storage_state, run_id)

View Source
@callback handle_init_storage(storage_state :: term(), run_id()) :: :ok

Initializes state store for given run.

Arguments

  • storage_state - behaviour implementation state
  • run_id
Link to this callback

handle_initialized?(storage_state, run_id)

View Source
@callback handle_initialized?(storage_state :: term(), run_id()) :: boolean()

Predicate returning true when state store of given run has been initialized.

Arguments

  • storage_state - behaviour implementation state
  • run_id
Link to this callback

handle_save_savepoint(storage_state, run_id, epoch_ms, list)

View Source
@callback handle_save_savepoint(
  storage_state :: term(),
  run_id(),
  Runbox.StateStore.ScheduleUtils.epoch_ms(),
  [{Runbox.StateStore.Entity.id(), Runbox.StateStore.Entity.state()}]
) :: :ok

Persists run state (savepoint) of given run.

Arguments

  • storage_state - behaviour implementation state
  • run_id
  • timestamp - timestamp of given savepoint
  • entities - savepoint state representation as {entity_id, entity_state}

Functions

Link to this function

delete_old_savepoints(run_id, keep_n \\ 1)

View Source
@spec delete_old_savepoints(run_id(), keep_n :: integer()) :: :ok

Deletes old savepoints of given run but keeps n latest savepoints.

Arguments

  • run_id
  • keep_n - count of persisted savepoints which should remain undeleted
@spec delete_run(run_id()) :: :ok | {:error, reason :: term()}

Deletes storage of run identified by run_id.

Arguments

  • run_id
@spec get_config() :: config()
Link to this function

get_latest_savepoint(run_id)

View Source
@spec get_latest_savepoint(run_id()) ::
  {:ok, Runbox.StateStore.ScheduleUtils.epoch_ms(),
   [{Runbox.StateStore.Entity.id(), Runbox.StateStore.Entity.state()}]}
  | {:error, :no_savepoint}

Returns latest persisted savepoint of given run.

Arguments

  • run_id
@spec init_config(config()) :: :ok

Initialize state store config.

We store StateStore configuration in ETS table, because current API defines both per running run functions and global StateStore functions.

@spec init_storage(run_id()) :: :ok

Initializes state store for given run.

Arguments

  • run_id
@spec initialized?(run_id()) :: boolean()

Predicate returning true when state store of given run has been initialized.

Arguments

  • run_id
Link to this function

last_persisted_timestamp(run_id)

View Source
@spec last_persisted_timestamp(run_id()) ::
  {:ok, Runbox.StateStore.ScheduleUtils.epoch_ms()} | {:error, term()}

Returns latest persisted savepoint timestamp of given run.

Arguments

  • run_id
Link to this function

save_run_state(run_id, timestamp, entities)

View Source

Persists run state (savepoint) of given run.

Arguments

  • run_id
  • timestamp - timestamp of given savepoint
  • entities - savepoint state representation as {entity_id, entity_state}