Runbox.Runtime.Stage.UnitRegistry (runbox v7.0.1)
Unit register used in stage based runtime to manage state of units.
UnitRegistry is configured via Runbox.Scenario.Template.StageBased.subscriptions/0
callback
result. Callback returns template subscriptions, each subscription is defined as
{message_type, routing_rule}. Unit registry contains one register per configured message_type.
Routing rule is used to store unit and later on lookup this unit when needed.
Units are being registered using unit attributes as defined in routing_rule. Unit lookup uses routing_rule to parse message body and the result compares to stored unit attributes.
Link to this section Summary
Functions
Returns unit with given id.
Initialize UnitRegistry non-persisted fields.
Searches for registered unit in the unit registry.
Creates new unit registry.
Returns first reached timeout and removes it from unit registry in one operation.
Registers given unit to the unit registry.
Registers timeout.
Reregisters unit in the unit registry.
Unregisters given unit from the unit registry.
Updates unit in unit registry.
Link to this section Types
msg_parser()
@type msg_parser() :: (Toolbox.Message.t() -> routing_key())
path()
routing_key()
@type routing_key() :: [String.t()]
routing_key_def()
@type t() :: %Runbox.Runtime.Stage.UnitRegistry{ alt_regs: %{ required(Toolbox.Message.type()) => %{required(routing_key()) => unit_id()} }, config: [{Toolbox.Message.type(), routing_key_def()}], parse_msg_fns: %{required(Toolbox.Message.type()) => msg_parser()}, register_unit_fns: %{required(Toolbox.Message.type()) => unit_register()}, timeouts: [ {Runbox.StateStore.ScheduleUtils.epoch_ms(), unit_id(), Toolbox.Message.t()} ], units: %{required(unit_id()) => Toolbox.Runtime.Stage.Unit.t()} }
unit_id()
@type unit_id() :: String.t()
unit_register()
@type unit_register() :: (map(), Toolbox.Message.type(), Toolbox.Runtime.Stage.Unit.t() -> map())
Link to this section Functions
get_unit(unit_registry, id)
@spec get_unit(t(), unit_id()) :: {:ok, Toolbox.Runtime.Stage.Unit.t()} | {:error, :not_found}
Returns unit with given id.
init(registry)
Initialize UnitRegistry non-persisted fields.
This needs to be called right before starting new unit registry (done in new/1
automatically)
and also after loading UnitRegistry from savepoint. This is needed because not all fields are
persisted into a savepoint.
lookup(unit_registry, msg)
@spec lookup(t(), Toolbox.Message.t()) :: {:ok, [Toolbox.Runtime.Stage.Unit.t()]} | {:error, :unknown_message_type}
Searches for registered unit in the unit registry.
new(config)
@spec new([{Toolbox.Message.type(), routing_key_def()}]) :: t()
Creates new unit registry.
pop_reached_timeout(unit_registry, ts)
@spec pop_reached_timeout(t(), Runbox.StateStore.ScheduleUtils.epoch_ms()) :: {:ok, Toolbox.Runtime.Stage.Unit.t(), Toolbox.Message.t(), t()} | :no_reached_timeout
Returns first reached timeout and removes it from unit registry in one operation.
register(unit_registry, unit)
@spec register(t(), Toolbox.Runtime.Stage.Unit.t()) :: t()
Registers given unit to the unit registry.
register_timeout(unit_registry, unit, ts, msg)
@spec register_timeout( t(), Toolbox.Runtime.Stage.Unit.t(), Runbox.StateStore.ScheduleUtils.epoch_ms(), Toolbox.Message.t() ) :: t()
Registers timeout.
reregister(unit_registry, unit)
@spec reregister(t(), Toolbox.Runtime.Stage.Unit.t()) :: t()
Reregisters unit in the unit registry.
Function updates alternative registers using new version of unit attributes. Timeouts will remain untouched.
unregister(unit_registry, unit)
@spec unregister(t(), Toolbox.Runtime.Stage.Unit.t()) :: t()
Unregisters given unit from the unit registry.
update(unit_registry, unit)
@spec update(t(), Toolbox.Runtime.Stage.Unit.t()) :: {:ok, t()} | {:error, :not_found}
Updates unit in unit registry.