Normalizer devel

Mix.install(
  [
    {:altworx_book, "~> 0.2.0"},
    {:jason, "~> 1.4"}
  ],
  config: [runbox: [scenario_config_dir: nil], tzdata: [autoupdate: :disabled]]
)

transformation-function

Transformation function

First define a module with transformation functions. A transformation function receives a single parameter %Toolbox.Message{}. The function should return one of:

  • %Toolbox.Message{} - an updated message struct.

  • :ignored - the message is silently ignored.

  • {:error, reason} - error has occured during the message processing.

defmodule Normalizer.Basic do
  alias Toolbox.Message

  @doc """
  Attempts to decode the message from JSON.
  """
  def decode(%Message{body: body} = msg) do
    case Jason.decode(body) do
      {:ok, body} -> %{msg | body: body}
      {:error, _} = error -> error
    end
  end

  @doc """
  Sets the message type from the message body.

  Falls back to a default value.
  """
  def set_type(%Message{body: body} = msg) do
    type = body["type"] || "default_type"
    %{msg | type: type}
  end

  @doc """
  Sets the message timestamp from the message body.

  As a fallback, keeps the timestamp intact.
  """
  def set_timestamp(%Message{body: body} = msg) do
    timestamp = body["timestamp"]

    if is_nil(timestamp) do
      msg
    else
      %{msg | timestamp: timestamp}
    end
  end

  @doc """
  Handle the messages according to their type.
  """
  def classify(%Message{type: type} = msg) do
    case type do
      # ignore all the boring messages
      "yawn" -> :ignored
      # error out on bad messages
      "bad" -> {:error, "bad message"}
      # keep all other messages
      _ -> msg
    end
  end

  @doc """
  Set random message type (for demonstartion purposes).
  """
  def set_random_type(%Message{} = msg) do
    types = [:type1, :type2, :type3, :type4]
    %{msg | type: Enum.random(types)}
  end

  @doc """
  Set random pipeline output (for demonstartion purposes).
  """
  def set_random_output(%Message{} = msg) do
    case Enum.random([:ok, :error, :raise, :ignored]) do
      :ok -> msg
      :error -> {:error, "chyba"}
      :raise -> raise "booom"
      :ignored -> :ignored
    end
  end
end

pipeline-definition

Pipeline definition

Normalizer pipeline consists of one or more transformation functions referenced by &Module.function/1.

The first function receives a struct %Toolbox.Message{} with the :timestamp set to time when the message has arrived into the system and :body set to an unprocessed string.

Next function in the pipeline receives the struct %Toolbox.Message{} returned from the previous function, etc.

If the last function in the pipeline returns a struct %Toolbox.Message{} it has to have the :type set to an atom or a string value and :timestamp set to an integer value.

pipeline = [
  &Normalizer.Basic.decode/1,
  &Normalizer.Basic.set_type/1,
  &Normalizer.Basic.set_timestamp/1,
  &Normalizer.Basic.classify/1
]

run-the-pipeline-on-custom-messages

Run the pipeline on custom messages

Define a few custom messages and see how the normalizer pipeline handles them. Each message has to be a map with :timestamp and :value keys.

Process the messages through the pipeline with the helper AltworxBook.run_pipeline/2. Result is a list of tuples, each in format {category, msg, original}, where category is one of :new, :error or :ignored, msg is the normalized message and original is the original message.

messages = [
  %{
    timestamp: 1_690_197_171_000,
    value: """
    {
      "timestamp": 12345,
      "varA": "a"
    }
    """
  },
  %{
    timestamp: 1_690_197_172_000,
    value: """
    {
      "type": "my_type",
      "varB": "b"
    }
    """
  },
  %{
    timestamp: 1_690_197_173_000,
    value: """
    {
      "type": "yawn"
    }
    """
  },
  %{
    timestamp: 1_690_197_174_000,
    value: """
    {
      "type": "bad"
    }
    """
  },
  %{
    timestamp: 1_690_197_174_000,
    value: """
    not a JSON
    """
  }
]

AltworxBook.run_pipeline(messages, pipeline)

connect-to-an-altworx-node

Connect to an Altworx node

Run altworx in developer mode via make in the Altworx repository. This livebook should be in the same network. That is, if run from docker, it should be started with the parameter -net host.

list-topics

List topics

List all topics available on the connected Altworx node.

AltworxBook.connect_to_altworx()
AltworxBook.list_raw_topics()

run-pipeline-on-an-existing-topic

Run pipeline on an existing topic

Now run the pipeline on messages from an existing topic. This returns perhaps a long list of terms with messages (depending on the number of the source messages in the topic).

pipeline = [
  &Normalizer.Basic.decode/1,
  &Normalizer.Basic.set_random_type/1,
  &Normalizer.Basic.set_timestamp/1,
  &Normalizer.Basic.set_random_output/1
]

topic_name = "reality_network_updates"
since = 0
message_count = 100
msgs = AltworxBook.list_raw_messages(topic_name, since, message_count)
results = AltworxBook.run_pipeline(msgs, pipeline)

Let's put the resulting messages into tables and graphs.

AltworxBook.visualize_pipeline_results(results)

pipeline-debug

Pipeline debug

You can debug each pipeline step one by one and inspect its outputs via IO.inspect/2 or livebook's dbg/0 functions.

topic_name = "reality_network_updates"
message_offset = 0

{:ok, msg} = AltworxBook.get_raw_message(topic_name, message_offset)

msg
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.decode/1)
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.set_random_type/1)
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.set_timestamp/1)
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.set_random_output/1)
|> AltworxBook.validate_pipeline_step_output_message()
|> dbg()