View Source Runbox.Scenario.Simple behaviour (runbox v13.0.3)
Simple scenario behavior.
Implement this behaviour to create a scenario. Note the scenario must also implement
Runbox.Scenario.Manifest
, although all this can be done in a single module, see example below.
Simple scenario structure
Simple scenario has a single module that carries the whole logic of the scenario. The module
specifies which logical topics it wants to consume (input_topics/0
) and defines how messages from
these topics are processed, updating its state and generating outputs along the way.
Usually you would use a single module for a scenario implementing both Manifest and this behavior.
However, you can separate these two if you wish. Note that the name of the module must always be
prefixed by the module name structure of the Manifest. E.g. if you have manifest module
Scenarios.MyScenario
the module implementing this behaviour can be named
Scenarios.MyScenario.Logic
but not Scenarios.MyScenarioLogic
.
Before the processing is first started an init/1
callback is used to initialize the state.
State is later used in the computation.
Each message in the specified topics is then processed by the handle_message/2
callback. This
callback should contain the main logic of the scenario. Messages arrive sorted into this callback -
you can assume the time is non-decreasing.
At minimum you must implement callbacks input_topics/0
and handle_message/2
, see other
callbacks available in this module and in Runbox.Scenario.Manifest
to see what is possible in a
scenario.
Determinism
Scenarios should be deterministic, meaning that if you run a scenario twice on the same data you get the same results. Scenarios shouldn't depend on any external resources other than topics and scenario configuration. Notably, scenarios shouldn't depend on current time. Scenario should always only work with time from the incoming messages, called external time.
You should avoid directly making side effects in a scenario. Scenario should instead produce
output actions, which are instructions to perform specific side effects. Output actions are
automatically performed by the scenario runtime in a deterministic manner. See
Runbox.Scenario.OutputAction
for the list of available output actions.
There are cases when you want to do some work at a specified point of the external time, for
example to produce some output actions when the external time reaches noon. This can be also
achieved in a deterministic manner via timeouts. You can register a timeout by returning
a corresponding runtime instruction, see Runbox.Runtime.RuntimeInstruction.register_timeout/1
for more details. The message registered in a timeout will be then passed as is to your
handle_message/2
callback exactly at the time you specify in its timestamp
field.
State persistence
State used in the computation is automatically and periodically persisted. This makes it easier to restart the computation. You don't have to recompute all of the data again to get the final state, you just load the last savepoint and recompute from there.
By default the state is persisted in the same structure as is used in the scenario. However, if
you wish you can implement get_state/1
and set_state/1
callbacks to transform the state
before it is saved into a savepoint and after it is loaded from a savepoint.
Example
defmodule Scenarios.ServerTemperature do
use Runbox.Scenario.Manifest
use Runbox.Scenario.Simple
alias Runbox.Scenario.Manifest
alias Runbox.Scenario.OutputAction.Event
alias Runbox.Scenario.OutputAction.UpsertAssetAttributes
alias Runbox.Scenario.Type, as: ScenarioType
alias Runbox.Message
@impl Manifest
def get_info do
%Manifest{
id: "server_temperature",
name: "Server temperature",
description: "Monitors server temperature and saves the data into Reality Network.",
type: ScenarioType.simple()
}
end
@impl Runbox.Scenario.Simple
def init(_) do
{:ok, [], %{last_temps: %{}}}
end
@impl Runbox.Scenario.Simple
def input_topics do
["server_sensor"]
end
@impl Runbox.Scenario.Simple
def handle_message(%Message{type: :temperature} = msg, state) do
last_temp = Map.get(state.last_temps, msg.body.server, 0)
if last_temp != msg.body.temperature do
# temperature changed, update reality network
event = %Event{
type: "temp_change",
template: "Temperature on ${actors.server} changed to ${params.temp}",
actors: %{"server" => %{asset_type: "/assets/servers", asset_id: msg.body.server}},
params: %{"temp" => msg.body.temperature},
origin_messages: [msg.origin]
}
update_asset = %UpsertAssetAttributes{
type: "/assets/servers",
id: msg.body.server,
attributes: %{"temp" => msg.body.temperature}
}
new_state = put_in(state.last_temps[msg.body.server], msg.body.temperature)
{:ok, [event, update_asset], new_state}
else
# temperature didn't change, no need to do anything
{:ok, [], state}
end
end
# ignore other sensor data
def handle_message(_, state) do
{:ok, [], state}
end
end
Summary
Types
External state of the scenario used for persistence.
Scenario outputs.
Internal state of the scenario used for computation.
Callbacks
Convert internal state to external.
Handle a message from input topics.
Initializes the scenario run.
Returns a list of topics this scenario consumes.
Convert external state to internal and bootstrap the scenario run.
Types
@type external_state() :: any()
External state of the scenario used for persistence.
When scenario needs to save its state into a savepoint, it converts the internal state into an
external state. For this purpose a callback get_state/1
is used. Similarly, when a savepoint
is loaded the external state inside is converted into the internal state, which is given to the
scenario so it can start processing messages. A callback set_state/1
is used for that purpose.
@type outputs() :: [ Runbox.Scenario.OutputAction.oa_params() | Runbox.Runtime.RuntimeInstruction.t() ]
Scenario outputs.
An output can be either an output actions or a runtime instructions.
@type state() :: any()
Internal state of the scenario used for computation.
This is what the scenario uses for its computation. This state is bootstrapped in init/1
and
is then used and modified in handle_message/2
.
Callbacks
@callback get_state(state()) :: {:ok, external_state()}
Convert internal state to external.
Called before a state is persisted, the callback should convert the internal state of the to the external state, which is ready for persistence. If the scenario uses any other resources that are not directly stored in the internal state, they should be stored in the external state in this step.
This callback is optional, defaulting to Function.identity/1
. Therefore, you should implement this
callback only when the external state differs from the internal.
@callback handle_message(msg :: Runbox.Message.t(), state()) :: {:ok, outputs(), state()}
Handle a message from input topics.
For each message in the input topics this callback is called. This is where most of the scenario business logic lives.
The computation shouldn't perform any direct side effects. Instead it should rely on output actions and runtime instructions that are returned from the callback. Output actions are performed at the time of the currently handled message.
Since scenario is a stateful processing, you can use the internal state in the computation. You get the previous state as an argument and you can modify the state.
This callback is mandatory, without it the scenario has no meaning.
@callback init(Runbox.Scenario.Simple.Config.t()) :: {:ok, outputs(), state()}
Initializes the scenario run.
When a run is first started, this callback is called to initialize the state. Additionally you can
also generate some output actions or runtime instructions. Output actions are produced in the time
of the start_from
parameter (see Runbox.Scenario.Simple.Config
).
The callback is optional, if not provided the state is initialized to nil
.
@callback input_topics() :: [ input_topic :: String.t() | {topic :: String.t(), type :: :input | :load} ]
Returns a list of topics this scenario consumes.
Each scenario consumes one or more topics specified in this callback.
Topic names in this callback are specified as logical topics. That is the name without
Altworx-specific prefixes, e.g. DATA_1
. It represents the logical data without any tie to the
physical representation. When a run is started physical topics are assigned to each logical topic.
That is the names of actual topics physically available on the Altworx instance. By default the
Altworx-specific prefix is added to the logical name to form a physical topic name, e.g.
N6_rt_DATA_1
, but user can override this and choose any other physical topic name.
Consumed topics can be of two different types:
- input topic - the default type, regular input, data is read from where run was started
- load topic - data is always read from the start irrespective of where run is started from. This is useful for sparse configuration topics, or generally topics from which you always need the complete information before starting processing other data.
A single scenario can subscribe to multiple topics of various types.
This callback is mandatory.
@callback set_state(external_state()) :: {:ok, state()}
Convert external state to internal and bootstrap the scenario run.
Called when loading a savepoint to start a run.
This callback converts the external persisted state into the internal state used for computation. The callback can also be used to start any other resources it uses and bootstrap them with information from external state.
This callback is optional, defaulting to Function.identity/1
. Therefore, you should implement
this callback only when the external state differs from the internal or when you need to bootstrap
some other resources.