Toolbox.Normalizer.LoadTopic.Pipeline (toolbox v1.1.0)

Normalizer pipeline helper for generic load topic.

This pipeline uses Toolbox.Normalizer.LoadTopic.Parser as a raw message parser.

Example pipeline definition in config.ini:

[pipelines.reality_network_updates_1]
input = reality_network_updates
big_delay_policy = adjust
definition = Toolbox.Normalizer.LoadTopic.Pipeline.parse_msg

Pipeline expects raw messages to be encoded JSON objects. Structure of JSON object should be as follow.

Every object has to have these 3 common attributes + few other type specific attributes.

  • load topic message identification + version via load_topic_message attribute with string version (currently only "1" and "2" are supported)
  • timestamp via timestamp attribute
  • action type via type attribute

Type create_asset attributes in version "1" of load message:

  • asset_id
  • attributes

Type upsert_asset attributes in version "1" of load message:

  • asset_id
  • attributes

Type update_asset attributes in version "1" of load message:

  • asset_id
  • attributes

Type update_asset attributes in version "2" of load message:

  • asset_id
  • update_attributes
  • delete_attributes

Type delete_asset attributes in version "1" of load message:

  • asset_id

Type create_edge attributes in version "1" of load message:

  • from_asset_id
  • to_asset_id
  • edge_type

Type upsert_edge attributes in version "1" of load message:

  • from_asset_id
  • to_asset_id
  • edge_type

Type delete_edge attributes in version "1" of load message:

  • from_asset_id
  • to_asset_id
  • edge_type

These types of messages are transformed into corresponding Altworx message Toolbox.Normalizer.LoadTopic.Pipeline.load_topic_message/0. For example raw message

{
    "load_topic_message": "1",
    "type": "create_asset",
    "timestamp": 12345,
    "asset_id": "/asset/foo/bar",
    "attributes": {
        "attr1": "attr1_val"
    }
}

would produce this Altworx message

%Toolbox.Message{
    type: :create_asset,
    timestamp: 12345,
    body: %{
        load_topic_message: "1",
        asset_id: "/asset/foo/bar",
        attributes: %{"attr1" => "attr1_val"}
    }
}

Messages produced by this pipeline are compatible with Scenario.LoadScenario scenario.

Link to this section Summary

Link to this section Types

Link to this type

load_topic_message()

@type load_topic_message() ::
  %Toolbox.Message{
    body: %{
      load_topic_message: String.t(),
      asset_id: String.t(),
      attributes: map()
    },
    from: term(),
    origin: term(),
    timestamp: term(),
    type: :create_asset | :upsert_asset | :update_asset
  }
  | %Toolbox.Message{
      body: %{load_topic_message: String.t(), asset_id: String.t()},
      from: term(),
      origin: term(),
      timestamp: term(),
      type: :delete_asset
    }
  | %Toolbox.Message{
      body: %{
        load_topic_message: String.t(),
        from_asset_id: String.t(),
        to_asset_id: String.t(),
        edge_type: String.t()
      },
      from: term(),
      origin: term(),
      timestamp: term(),
      type: :create_edge | :upsert_edge | :delete_edge
    }

Link to this section Functions

Link to this function

parse_msg(toolbox_msg)

@spec parse_msg(Toolbox.Message.t()) :: load_topic_message() | {:error, term()}