Normalizer debug

Mix.install(
  [
    {:altworx_book, "~> 0.2.0"},
    {:jason, "~> 1.4"}
  ],
  config: [runbox: [scenario_config_dir: nil], tzdata: [autoupdate: :disabled]]
)

AltworxBook.connect_to_altworx()

altworx-setup

Altworx setup

Run altworx in developer mode via make in the Altworx repository. This livebook should be in the same network. That is, if run from docker, it should be started with the parameter -net host.

pipeline-debug

Pipeline debug

First we need pipeline module which defines transformation function/s.

defmodule Normalizer.Basic do
  alias Toolbox.Message

  @doc """
  Attempts to decode the message from JSON.
  """
  def decode(%Message{body: body} = msg) do
    case Jason.decode(body) do
      {:ok, body} -> %{msg | body: body}
      {:error, _} = error -> error
    end
  end

  @doc """
  Sets the message type from the message body.

  Falls back to a default value.
  """
  def set_type(%Message{body: body} = msg) do
    type = body["type"] || "default_type"
    %{msg | type: type}
  end

  @doc """
  Sets the message timestamp from the message body.

  As a fallback, keeps the timestamp intact.
  """
  def set_timestamp(%Message{body: body} = msg) do
    timestamp = body["timestamp"]

    if is_nil(timestamp) do
      msg
    else
      %{msg | timestamp: timestamp}
    end
  end
end

Then we need pipeline definition.

pipeline_simple = [
  &Normalizer.Basic.decode/1,
  &Normalizer.Basic.set_type/1,
  &Normalizer.Basic.set_timestamp/1
]

With this configuration we can start debuging our pipeline functions. We can evaluate whole pipeline.

topic_name = "reality_network_updates"
since = 0
message_count = 1

msgs = AltworxBook.list_raw_messages(topic_name, since, message_count)
AltworxBook.run_pipeline(msgs, pipeline_simple)

Or evaluate each pipeline step one by one and print out its outputs (via IO.inspect/1 or livebook's dbg/0).

topic_name = "reality_network_updates"
message_offset = 0

{:ok, msg} = AltworxBook.get_raw_message(topic_name, message_offset)

msg
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.decode/1)
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.set_type/1)
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.set_timestamp/1)
|> AltworxBook.validate_pipeline_step_output_message()
|> dbg()