View Source Runbox.Scenario.Simple behaviour (runbox v13.0.3)

Simple scenario behavior.

Implement this behaviour to create a scenario. Note the scenario must also implement Runbox.Scenario.Manifest, although all this can be done in a single module, see example below.

Simple scenario structure

Simple scenario has a single module that carries the whole logic of the scenario. The module specifies which logical topics it wants to consume (input_topics/0) and defines how messages from these topics are processed, updating its state and generating outputs along the way.

Usually you would use a single module for a scenario implementing both Manifest and this behavior. However, you can separate these two if you wish. Note that the name of the module must always be prefixed by the module name structure of the Manifest. E.g. if you have manifest module Scenarios.MyScenario the module implementing this behaviour can be named Scenarios.MyScenario.Logic but not Scenarios.MyScenarioLogic.

Before the processing is first started an init/1 callback is used to initialize the state. State is later used in the computation.

Each message in the specified topics is then processed by the handle_message/2 callback. This callback should contain the main logic of the scenario. Messages arrive sorted into this callback - you can assume the time is non-decreasing.

At minimum you must implement callbacks input_topics/0 and handle_message/2, see other callbacks available in this module and in Runbox.Scenario.Manifest to see what is possible in a scenario.

Determinism

Scenarios should be deterministic, meaning that if you run a scenario twice on the same data you get the same results. Scenarios shouldn't depend on any external resources other than topics and scenario configuration. Notably, scenarios shouldn't depend on current time. Scenario should always only work with time from the incoming messages, called external time.

You should avoid directly making side effects in a scenario. Scenario should instead produce output actions, which are instructions to perform specific side effects. Output actions are automatically performed by the scenario runtime in a deterministic manner. See Runbox.Scenario.OutputAction for the list of available output actions.

There are cases when you want to do some work at a specified point of the external time, for example to produce some output actions when the external time reaches noon. This can be also achieved in a deterministic manner via timeouts. You can register a timeout by returning a corresponding runtime instruction, see Runbox.Runtime.RuntimeInstruction.register_timeout/1 for more details. The message registered in a timeout will be then passed as is to your handle_message/2 callback exactly at the time you specify in its timestamp field.

State persistence

State used in the computation is automatically and periodically persisted. This makes it easier to restart the computation. You don't have to recompute all of the data again to get the final state, you just load the last savepoint and recompute from there.

By default the state is persisted in the same structure as is used in the scenario. However, if you wish you can implement get_state/1 and set_state/1 callbacks to transform the state before it is saved into a savepoint and after it is loaded from a savepoint.

Example

defmodule Scenarios.ServerTemperature do
  use Runbox.Scenario.Manifest
  use Runbox.Scenario.Simple

  alias Runbox.Scenario.Manifest
  alias Runbox.Scenario.OutputAction.Event
  alias Runbox.Scenario.OutputAction.UpsertAssetAttributes
  alias Runbox.Scenario.Type, as: ScenarioType
  alias Runbox.Message

  @impl Manifest
  def get_info do
    %Manifest{
      id: "server_temperature",
      name: "Server temperature",
      description: "Monitors server temperature and saves the data into Reality Network.",
      type: ScenarioType.simple()
    }
  end

  @impl Runbox.Scenario.Simple
  def init(_) do
    {:ok, [], %{last_temps: %{}}}
  end

  @impl Runbox.Scenario.Simple
  def input_topics do
    ["server_sensor"]
  end

  @impl Runbox.Scenario.Simple
  def handle_message(%Message{type: :temperature} = msg, state) do
    last_temp = Map.get(state.last_temps, msg.body.server, 0)

    if last_temp != msg.body.temperature do
      # temperature changed, update reality network
      event = %Event{
        type: "temp_change",
        template: "Temperature on ${actors.server} changed to ${params.temp}",
        actors: %{"server" => %{asset_type: "/assets/servers", asset_id: msg.body.server}},
        params: %{"temp" => msg.body.temperature},
        origin_messages: [msg.origin]
      }

      update_asset = %UpsertAssetAttributes{
        type: "/assets/servers",
        id: msg.body.server,
        attributes: %{"temp" => msg.body.temperature}
      }

      new_state = put_in(state.last_temps[msg.body.server], msg.body.temperature)
      {:ok, [event, update_asset], new_state}
    else
      # temperature didn't change, no need to do anything
      {:ok, [], state}
    end
  end

  # ignore other sensor data
  def handle_message(_, state) do
    {:ok, [], state}
  end
end

Summary

Types

External state of the scenario used for persistence.

Scenario outputs.

Internal state of the scenario used for computation.

Callbacks

Convert internal state to external.

Handle a message from input topics.

Initializes the scenario run.

Returns a list of topics this scenario consumes.

Convert external state to internal and bootstrap the scenario run.

Types

@type external_state() :: any()

External state of the scenario used for persistence.

When scenario needs to save its state into a savepoint, it converts the internal state into an external state. For this purpose a callback get_state/1 is used. Similarly, when a savepoint is loaded the external state inside is converted into the internal state, which is given to the scenario so it can start processing messages. A callback set_state/1 is used for that purpose.

Scenario outputs.

An output can be either an output actions or a runtime instructions.

@type state() :: any()

Internal state of the scenario used for computation.

This is what the scenario uses for its computation. This state is bootstrapped in init/1 and is then used and modified in handle_message/2.

Callbacks

Link to this callback

get_state(state)

View Source (optional)
@callback get_state(state()) :: {:ok, external_state()}

Convert internal state to external.

Called before a state is persisted, the callback should convert the internal state of the to the external state, which is ready for persistence. If the scenario uses any other resources that are not directly stored in the internal state, they should be stored in the external state in this step.

This callback is optional, defaulting to Function.identity/1. Therefore, you should implement this callback only when the external state differs from the internal.

Link to this callback

handle_message(msg, state)

View Source
@callback handle_message(msg :: Runbox.Message.t(), state()) :: {:ok, outputs(), state()}

Handle a message from input topics.

For each message in the input topics this callback is called. This is where most of the scenario business logic lives.

The computation shouldn't perform any direct side effects. Instead it should rely on output actions and runtime instructions that are returned from the callback. Output actions are performed at the time of the currently handled message.

Since scenario is a stateful processing, you can use the internal state in the computation. You get the previous state as an argument and you can modify the state.

This callback is mandatory, without it the scenario has no meaning.

@callback init(Runbox.Scenario.Simple.Config.t()) :: {:ok, outputs(), state()}

Initializes the scenario run.

When a run is first started, this callback is called to initialize the state. Additionally you can also generate some output actions or runtime instructions. Output actions are produced in the time of the start_from parameter (see Runbox.Scenario.Simple.Config).

The callback is optional, if not provided the state is initialized to nil.

@callback input_topics() :: [
  input_topic :: String.t() | {topic :: String.t(), type :: :input | :load}
]

Returns a list of topics this scenario consumes.

Each scenario consumes one or more topics specified in this callback.

Topic names in this callback are specified as logical topics. That is the name without Altworx-specific prefixes, e.g. DATA_1. It represents the logical data without any tie to the physical representation. When a run is started physical topics are assigned to each logical topic. That is the names of actual topics physically available on the Altworx instance. By default the Altworx-specific prefix is added to the logical name to form a physical topic name, e.g. N6_rt_DATA_1, but user can override this and choose any other physical topic name.

Consumed topics can be of two different types:

  • input topic - the default type, regular input, data is read from where run was started
  • load topic - data is always read from the start irrespective of where run is started from. This is useful for sparse configuration topics, or generally topics from which you always need the complete information before starting processing other data.

A single scenario can subscribe to multiple topics of various types.

This callback is mandatory.

Link to this callback

set_state(external_state)

View Source (optional)
@callback set_state(external_state()) :: {:ok, state()}

Convert external state to internal and bootstrap the scenario run.

Called when loading a savepoint to start a run.

This callback converts the external persisted state into the internal state used for computation. The callback can also be used to start any other resources it uses and bootstrap them with information from external state.

This callback is optional, defaulting to Function.identity/1. Therefore, you should implement this callback only when the external state differs from the internal or when you need to bootstrap some other resources.