Normalizer debug
Mix.install(
[
{:altworx_book, "~> 0.2.0"},
{:jason, "~> 1.4"}
],
config: [runbox: [scenario_config_dir: nil], tzdata: [autoupdate: :disabled]]
)
AltworxBook.connect_to_altworx()
altworx-setup
Altworx setup
Run altworx in developer mode via make
in the Altworx repository.
This livebook should be in the same network. That is, if run from docker, it should be started with the parameter -net host
.
pipeline-debug
Pipeline debug
First we need pipeline module which defines transformation function/s.
defmodule Normalizer.Basic do
alias Toolbox.Message
@doc """
Attempts to decode the message from JSON.
"""
def decode(%Message{body: body} = msg) do
case Jason.decode(body) do
{:ok, body} -> %{msg | body: body}
{:error, _} = error -> error
end
end
@doc """
Sets the message type from the message body.
Falls back to a default value.
"""
def set_type(%Message{body: body} = msg) do
type = body["type"] || "default_type"
%{msg | type: type}
end
@doc """
Sets the message timestamp from the message body.
As a fallback, keeps the timestamp intact.
"""
def set_timestamp(%Message{body: body} = msg) do
timestamp = body["timestamp"]
if is_nil(timestamp) do
msg
else
%{msg | timestamp: timestamp}
end
end
end
Then we need pipeline definition.
pipeline_simple = [
&Normalizer.Basic.decode/1,
&Normalizer.Basic.set_type/1,
&Normalizer.Basic.set_timestamp/1
]
With this configuration we can start debuging our pipeline functions. We can evaluate whole pipeline.
topic_name = "reality_network_updates"
since = 0
message_count = 1
msgs = AltworxBook.list_raw_messages(topic_name, since, message_count)
AltworxBook.run_pipeline(msgs, pipeline_simple)
Or evaluate each pipeline step one by one and print out its outputs (via IO.inspect/1
or livebook's dbg/0
).
topic_name = "reality_network_updates"
message_offset = 0
{:ok, msg} = AltworxBook.get_raw_message(topic_name, message_offset)
msg
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.decode/1)
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.set_type/1)
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.set_timestamp/1)
|> AltworxBook.validate_pipeline_step_output_message()
|> dbg()