View Source Runbox.StateStore (runbox v21.2.0)
State store is interface for run state persistence.
State store gathers state of all run stateful components into scheduled sets = savepoints. These
savepoints are managed by Runbox.StateStore.RunState
.
Workflow of the state store is as follows:
- StateStore initialization
- Initialize new run state via
Runbox.StateStore.init_run_state/3
- Load already existing run state via
Runbox.StateStore.load_run_state/1
- Save all registered stateful components via
Runbox.StateStore.Entity.update_state/3
- Confirm all messages of certain time were processed with
Runbox.StateStore.Entity.ack_processed_time/4
Summary
Functions
Returns a specification to start this module under a supervisor.
Initializes state store for new run with specified timestamp and entity definitions.
Returns true
when state store was initialized.
Loads run state for existing run.
Saves entity state to all savepoints responsible for given timestamp.
Starts state store for run with given run_id
.
Types
@type run_id() :: String.t()
Functions
Returns a specification to start this module under a supervisor.
See Supervisor
.
@spec init_run_state(pid(), Runbox.StateStore.ScheduleUtils.epoch_ms(), [ {Runbox.StateStore.Entity.id(), Runbox.StateStore.Entity.state()} ]) :: {:ok, [Runbox.StateStore.Entity.t()]}
Initializes state store for new run with specified timestamp and entity definitions.
Arguments
state_store_pid
pid of StateStoretimestamp
- runstart_from
, this timestamp is used to calculate savepoint timestampsentity_defs
- state entity definitions defined as list of{entity_id, entity_state}
Returns true
when state store was initialized.
Arguments
state_store_pid
pid of StateStore
@spec load_run_state(pid(), (entity_defs -> entity_defs)) :: {:ok, Runbox.StateStore.ScheduleUtils.epoch_ms(), [Runbox.StateStore.Entity.t()]} | {:error, term()} | {:error, :no_savepoint} when entity_defs: [ {Runbox.StateStore.Entity.id(), Runbox.StateStore.Entity.state()} ]
Loads run state for existing run.
Run state is loaded for already initialized run which has persisted at least one savepoint.
Arguments
state_store_pid
pid of StateStoreupdate_fn
- function which takes a keyword list[{Entity.id(), Entity.state()}]
and can return an updated keyword list. Useful to upgrade state store contents between versions.
@spec save(Runbox.StateStore.Entity.t(), Runbox.StateStore.ScheduleUtils.epoch_ms()) :: :ok
Saves entity state to all savepoints responsible for given timestamp.
Arguments
entity
- contains state to be savedtimestamp
- timestamp associated with entity state
@spec start_link(run_id(), Runbox.StateStore.ScheduleUtils.schedule()) :: GenServer.on_start()
Starts state store for run with given run_id
.