View Source Runbox.StateStore (runbox v21.2.0)

State store is interface for run state persistence.

State store gathers state of all run stateful components into scheduled sets = savepoints. These savepoints are managed by Runbox.StateStore.RunState.

Workflow of the state store is as follows:

  1. StateStore initialization
  2. Initialize new run state via Runbox.StateStore.init_run_state/3
  3. Load already existing run state via Runbox.StateStore.load_run_state/1
  4. Save all registered stateful components via Runbox.StateStore.Entity.update_state/3
  5. Confirm all messages of certain time were processed with Runbox.StateStore.Entity.ack_processed_time/4

Summary

Functions

Returns a specification to start this module under a supervisor.

Initializes state store for new run with specified timestamp and entity definitions.

Returns true when state store was initialized.

Loads run state for existing run.

Saves entity state to all savepoints responsible for given timestamp.

Starts state store for run with given run_id.

Types

@type run_id() :: String.t()

Functions

Returns a specification to start this module under a supervisor.

See Supervisor.

Link to this function

init_run_state(state_store_pid, timestamp, entity_defs)

View Source

Initializes state store for new run with specified timestamp and entity definitions.

Arguments

  • state_store_pid pid of StateStore
  • timestamp - run start_from, this timestamp is used to calculate savepoint timestamps
  • entity_defs - state entity definitions defined as list of {entity_id, entity_state}
Link to this function

initialized?(state_store_pid)

View Source
@spec initialized?(pid()) :: boolean()

Returns true when state store was initialized.

Arguments

  • state_store_pid pid of StateStore
Link to this function

load_run_state(state_store_pid, update_fn \\ & &1)

View Source
@spec load_run_state(pid(), (entity_defs -> entity_defs)) ::
  {:ok, Runbox.StateStore.ScheduleUtils.epoch_ms(),
   [Runbox.StateStore.Entity.t()]}
  | {:error, term()}
  | {:error, :no_savepoint}
when entity_defs: [
       {Runbox.StateStore.Entity.id(), Runbox.StateStore.Entity.state()}
     ]

Loads run state for existing run.

Run state is loaded for already initialized run which has persisted at least one savepoint.

Arguments

  • state_store_pid pid of StateStore
  • update_fn - function which takes a keyword list [{Entity.id(), Entity.state()}] and can return an updated keyword list. Useful to upgrade state store contents between versions.

Saves entity state to all savepoints responsible for given timestamp.

Arguments

  • entity - contains state to be saved
  • timestamp - timestamp associated with entity state
Link to this function

start_link(run_id, schedule)

View Source

Starts state store for run with given run_id.