Toolbox.Normalizer.LoadTopic.Pipeline (toolbox v1.1.0)
Normalizer pipeline helper for generic load topic.
This pipeline uses Toolbox.Normalizer.LoadTopic.Parser
as a raw message parser.
Example pipeline definition in config.ini
:
[pipelines.reality_network_updates_1]
input = reality_network_updates
big_delay_policy = adjust
definition = Toolbox.Normalizer.LoadTopic.Pipeline.parse_msg
Pipeline expects raw messages to be encoded JSON objects. Structure of JSON object should be as follow.
Every object has to have these 3 common attributes + few other type specific attributes.
- load topic message identification + version via
load_topic_message
attribute with string version (currently only"1"
and"2"
are supported) - timestamp via
timestamp
attribute - action type via
type
attribute
Type create_asset
attributes in version "1"
of load message:
asset_id
attributes
Type upsert_asset
attributes in version "1"
of load message:
asset_id
attributes
Type update_asset
attributes in version "1"
of load message:
asset_id
attributes
Type update_asset
attributes in version "2"
of load message:
asset_id
update_attributes
delete_attributes
Type delete_asset
attributes in version "1"
of load message:
asset_id
Type create_edge
attributes in version "1"
of load message:
from_asset_id
to_asset_id
edge_type
Type upsert_edge
attributes in version "1"
of load message:
from_asset_id
to_asset_id
edge_type
Type delete_edge
attributes in version "1"
of load message:
from_asset_id
to_asset_id
edge_type
These types of messages are transformed into corresponding Altworx message Toolbox.Normalizer.LoadTopic.Pipeline.load_topic_message/0
.
For example raw message
{
"load_topic_message": "1",
"type": "create_asset",
"timestamp": 12345,
"asset_id": "/asset/foo/bar",
"attributes": {
"attr1": "attr1_val"
}
}
would produce this Altworx message
%Toolbox.Message{
type: :create_asset,
timestamp: 12345,
body: %{
load_topic_message: "1",
asset_id: "/asset/foo/bar",
attributes: %{"attr1" => "attr1_val"}
}
}
Messages produced by this pipeline are compatible with Scenario.LoadScenario
scenario.
Link to this section Summary
Link to this section Types
load_topic_message()
@type load_topic_message() :: %Toolbox.Message{ body: %{ load_topic_message: String.t(), asset_id: String.t(), attributes: map() }, from: term(), origin: term(), timestamp: term(), type: :create_asset | :upsert_asset | :update_asset } | %Toolbox.Message{ body: %{load_topic_message: String.t(), asset_id: String.t()}, from: term(), origin: term(), timestamp: term(), type: :delete_asset } | %Toolbox.Message{ body: %{ load_topic_message: String.t(), from_asset_id: String.t(), to_asset_id: String.t(), edge_type: String.t() }, from: term(), origin: term(), timestamp: term(), type: :create_edge | :upsert_edge | :delete_edge }
Link to this section Functions
parse_msg(toolbox_msg)
@spec parse_msg(Toolbox.Message.t()) :: load_topic_message() | {:error, term()}