Runbox.Runtime.Stage.UnitRegistry (runbox v1.3.0)

Unit register used in stage based runtime to manage state of units.

UnitRegistry is configured via Toolbox.Scenario.Template.StageBased.subscriptions/0 callback result. Callback returns template subscriptions, each subscription is defined as {message_type, routing_rule}. Unit registry contains one register per configured message_type. Routing rule is used to store unit and later on lookup this unit when needed.

Units are being registered using unit attributes as defined in routing_rule. Unit lookup uses routing_rule to parse message body and the result compares to stored unit attributes.

Link to this section Summary

Functions

Returns unit with given id.

Initialize UnitRegistry non-persisted fields.

Searches for registered unit in the unit registry.

Creates new unit registry.

Returns first reached timeout and removes it from unit registry in one operation.

Registers given unit to the unit registry.

Reregisters unit in the unit registry.

Unregisters given unit from the unit registry.

Updates unit in unit registry.

Link to this section Types

Link to this type

msg_parser()

@type msg_parser() :: (Toolbox.Message.t() -> routing_key())
@type path() :: [String.t() | atom()]
Link to this type

routing_key()

@type routing_key() :: [String.t()]
Link to this type

routing_key_def()

@type routing_key_def() :: {:=, [path()], [path()]} | {:in, [path()], [path()]}
@type t() :: %Runbox.Runtime.Stage.UnitRegistry{
  alt_regs: %{
    required(Toolbox.Message.type()) => %{required(routing_key()) => unit_id()}
  },
  config: [{Toolbox.Message.type(), routing_key_def()}],
  parse_msg_fns: %{required(Toolbox.Message.type()) => msg_parser()},
  register_unit_fns: %{required(Toolbox.Message.type()) => unit_register()},
  timeouts: [
    {Runbox.StateStore.ScheduleUtils.epoch_ms(), unit_id(), Toolbox.Message.t()}
  ],
  units: %{required(unit_id()) => Toolbox.Runtime.Stage.Unit.t()}
}
@type unit_id() :: String.t()
Link to this type

unit_register()

@type unit_register() ::
  (map(), Toolbox.Message.type(), Toolbox.Runtime.Stage.Unit.t() -> map())

Link to this section Functions

Link to this function

get_unit(unit_registry, id)

@spec get_unit(t(), unit_id()) ::
  {:ok, Toolbox.Runtime.Stage.Unit.t()} | {:error, :not_found}

Returns unit with given id.

@spec init(t()) :: t()

Initialize UnitRegistry non-persisted fields.

This needs to be called right before starting new unit registry (done in new/1 automatically) and also after loading UnitRegistry from savepoint. This is needed because not all fields are persisted into a savepoint.

Link to this function

lookup(unit_registry, msg)

@spec lookup(t(), Toolbox.Message.t()) ::
  {:ok, [Toolbox.Runtime.Stage.Unit.t()]} | {:error, :unknown_message_type}

Searches for registered unit in the unit registry.

@spec new([{Toolbox.Message.type(), routing_key_def()}]) :: t()

Creates new unit registry.

Link to this function

pop_reached_timeout(unit_registry, ts)

@spec pop_reached_timeout(t(), Runbox.StateStore.ScheduleUtils.epoch_ms()) ::
  {:ok, Toolbox.Runtime.Stage.Unit.t(), Toolbox.Message.t(), t()}
  | :no_reached_timeout

Returns first reached timeout and removes it from unit registry in one operation.

Link to this function

register(unit_registry, unit)

@spec register(t(), Toolbox.Runtime.Stage.Unit.t()) :: t()

Registers given unit to the unit registry.

Link to this function

register_timeout(unit_registry, unit, ts, msg)

Registers timeout.

Link to this function

reregister(unit_registry, unit)

@spec reregister(t(), Toolbox.Runtime.Stage.Unit.t()) :: t()

Reregisters unit in the unit registry.

Function updates alternative registers using new version of unit attributes. Timeouts will remain untouched.

Link to this function

unregister(unit_registry, unit)

@spec unregister(t(), Toolbox.Runtime.Stage.Unit.t()) :: t()

Unregisters given unit from the unit registry.

Link to this function

update(unit_registry, unit)

@spec update(t(), Toolbox.Runtime.Stage.Unit.t()) :: {:ok, t()} | {:error, :not_found}

Updates unit in unit registry.