View Source Runbox.Runtime.Stage.Emulator.Template (runbox v21.2.0)
StageBased Template emulator.
This is similar to Runbox.Runtime.Stage.TemplateImpl
but designed specifically for the
in-process emulation via Runbox.Runtime.Stage.Emulator
.
If emulating StageBased scenario, do not use this module directly, rather use
Runbox.Runtime.Stage.Emulator
to emulate the whole scenario.
Summary
Functions
Returns whether this template requires deduplication on the inputs.
Create a struct representing an emulated template.
Handle a message in the template.
Initialize static defined units.
Returns input topics the template module is subscribed to.
Re-initialize a template.
Determines whether a template should process a message.
Sorts a list of templates in a topological order by their subscriptions.
Types
@type scenario_output() :: Runbox.Scenario.OutputAction.oa_params() | Runbox.Runtime.RuntimeInstruction.t()
@opaque t()
@type template_output() :: scenario_output() | Runbox.Message.t()
Functions
Returns whether this template requires deduplication on the inputs.
In a StageBased scenario a deduplication is performed by a Timezip component. So we need to perform deduplication only in places where there would be a Timezip. Other places should happily receive duplicated messages.
Create a struct representing an emulated template.
The struct holds all necessary information about the template, including its current state. This struct is used during the emulation, so you generally start by creating this struct via this function.
@spec handle_message(t(), Runbox.Message.t()) :: {t(), [template_output()]}
Handle a message in the template.
This involves the following:
- use the routing part of the subscriptions to find all units that should handle the message
- handle the message in all these units via the
Runbox.Scenario.Template.StageBased.handle_message/2
callback - if no unit was found trigger
Runbox.Scenario.Template.StageBased.handle_asset_discovery/1
to potentially create a new unit
Note that timeouts are routed directly to the unit that created them, they are not subjected to the same routing as other messages which come from input topics or other templates.
@spec initialize_static_units(t(), start_from_ts :: non_neg_integer()) :: {t(), [scenario_output()]}
Initialize static defined units.
Static units are defined in Runbox.Scenario.Template.StageBased.instances/0
callback. The
units are created and initialized via the Runbox.Scenario.Template.StageBased.init/2
callback.
Returns input topics the template module is subscribed to.
Re-initialize a template.
This step is necessary when a template is reconstructed from a savepoint or after a similar transformation which might corrupt internal template state.
This is required, because this module depends on Runbox.Runtime.Stage.UnitRegistry
which keeps
anonymous functions in its state. Such function references might not survive a persistence (e.g.
into ETF) and need to be reconstructed.
@spec should_handle_message?( t(), Runbox.Message.t(), origin :: (logical_topic :: String.t()) | (template :: module()) | nil ) :: boolean()
Determines whether a template should process a message.
Template should process a message either when the message matches one of its subscriptions or when the message is a planned timeout for a unit of this template. A part of this information is where is the message coming from - a specific logical topic or template.
Sorts a list of templates in a topological order by their subscriptions.
Templates can depend on each other but these dependencies can never form a loop. Therefore, we can sort templates using these dependencies into a topological order. This gives us the templates in the order in which they can process messages. Then a latter template might consume outputs of a former template, but never the other way round.
This also ensures the order is the same as in StageBased sorted component network. That is used to make Timezip emit outputs in a deterministic manner. Since there is no Timezip in the emulation we rely solely on the order of the templates to achieve the same order of outputs.