View Source Runbox.StateStore.Entity (runbox v21.2.0)

Module defines struct which acts as state store envelope for any stateful runtime component.

Summary

Functions

Confirm that all messages with lower timestamps were processed.

Returns entity id

Returns entity schedule

Returns entity state

Returns entity timestamp

Set entity state at the specified point in time.

Types

@type id() :: String.t() | atom()
@type state() :: term()
@type t() :: t(state())
@type t(state) :: %Runbox.StateStore.Entity{
  id: id(),
  last_savepoint: Runbox.StateStore.ScheduleUtils.epoch_ms(),
  next_savepoint: Runbox.StateStore.ScheduleUtils.epoch_ms(),
  schedule: Runbox.StateStore.ScheduleUtils.schedule(),
  state: state,
  state_store_pid: pid(),
  timestamp: Runbox.StateStore.ScheduleUtils.epoch_ms()
}

Functions

Link to this function

ack_processed_time(entity, timestamp, save_mfa, before_save_transform \\ nil)

View Source
@spec ack_processed_time(
  t(),
  Runbox.StateStore.ScheduleUtils.epoch_ms(),
  {module(), atom(), list()},
  before_save_transform :: (t() -> {:ok, t()} | {:error, reason :: any()}) | nil
) :: {:ok, t()} | {:error, term()}

Confirm that all messages with lower timestamps were processed.

When the timestamp is higher than next planned savepoint, the state is persisted into StateStore with StateStore.save/4 and next savepoint target is determined and stored in the entity. The state is persisted only once per every savepoint.

Acking a timestamp means that the current time is timestamp, i.e., all messages with timestamp lower than timestamp were processed, and there may possibly be some messages with timestamp equal to timestamp that were not processed yet.

It is important to call this function before processing the message with timestamp timestamp, so that the state of the passed entity belongs to the timestamp before that message was processed. This is critical for ensuring consistency of savepoints. If this function was called after processing a message, the timestamp attached to a savepoint would be older than its state.

Optionally you can specify an entity transformation that is applied before save. This transformation is only execute before save, so it can be expensive. The transformation returns either {:ok, entity} or {:error, any()} if there's any error during the transformation preventing the entity to be saved.

@spec id(t()) :: id()

Returns entity id

Link to this function

new(state_store_pid, id, schedule, timestamp, state)

View Source

Creates new state store entity

Returns entity schedule

@spec state(t()) :: state()

Returns entity state

Returns entity timestamp

Link to this function

update_state(entity, timestamp, state)

View Source
@spec update_state(t(), Runbox.StateStore.ScheduleUtils.epoch_ms(), state()) :: t()

Set entity state at the specified point in time.