Normalizer devel
Mix.install(
[
{:altworx_book, "~> 0.2.0"},
{:jason, "~> 1.4"}
],
config: [runbox: [scenario_config_dir: nil], tzdata: [autoupdate: :disabled]]
)
transformation-function
Transformation function
First define a module with transformation functions. A transformation function receives a single parameter %Toolbox.Message{}
. The function should return one of:
%Toolbox.Message{}
- an updated message struct.:ignored
- the message is silently ignored.{:error, reason}
- error has occured during the message processing.
defmodule Normalizer.Basic do
alias Toolbox.Message
@doc """
Attempts to decode the message from JSON.
"""
def decode(%Message{body: body} = msg) do
case Jason.decode(body) do
{:ok, body} -> %{msg | body: body}
{:error, _} = error -> error
end
end
@doc """
Sets the message type from the message body.
Falls back to a default value.
"""
def set_type(%Message{body: body} = msg) do
type = body["type"] || "default_type"
%{msg | type: type}
end
@doc """
Sets the message timestamp from the message body.
As a fallback, keeps the timestamp intact.
"""
def set_timestamp(%Message{body: body} = msg) do
timestamp = body["timestamp"]
if is_nil(timestamp) do
msg
else
%{msg | timestamp: timestamp}
end
end
@doc """
Handle the messages according to their type.
"""
def classify(%Message{type: type} = msg) do
case type do
# ignore all the boring messages
"yawn" -> :ignored
# error out on bad messages
"bad" -> {:error, "bad message"}
# keep all other messages
_ -> msg
end
end
@doc """
Set random message type (for demonstartion purposes).
"""
def set_random_type(%Message{} = msg) do
types = [:type1, :type2, :type3, :type4]
%{msg | type: Enum.random(types)}
end
@doc """
Set random pipeline output (for demonstartion purposes).
"""
def set_random_output(%Message{} = msg) do
case Enum.random([:ok, :error, :raise, :ignored]) do
:ok -> msg
:error -> {:error, "chyba"}
:raise -> raise "booom"
:ignored -> :ignored
end
end
end
pipeline-definition
Pipeline definition
Normalizer pipeline consists of one or more transformation functions referenced by &Module.function/1
.
The first function receives a struct %Toolbox.Message{}
with the :timestamp
set to time when the message has arrived into the system and :body
set to an unprocessed string.
Next function in the pipeline receives the struct %Toolbox.Message{}
returned from the previous function, etc.
If the last function in the pipeline returns a struct %Toolbox.Message{}
it has to have the :type
set to an atom or a string value and :timestamp
set to an integer value.
pipeline = [
&Normalizer.Basic.decode/1,
&Normalizer.Basic.set_type/1,
&Normalizer.Basic.set_timestamp/1,
&Normalizer.Basic.classify/1
]
run-the-pipeline-on-custom-messages
Run the pipeline on custom messages
Define a few custom messages and see how the normalizer pipeline handles them. Each message has to be a map with :timestamp
and :value
keys.
Process the messages through the pipeline with the helper AltworxBook.run_pipeline/2
. Result is a list of tuples, each in format {category, msg, original}
, where category is one of :new
, :error
or :ignored
, msg
is the normalized message and original
is the original message.
messages = [
%{
timestamp: 1_690_197_171_000,
value: """
{
"timestamp": 12345,
"varA": "a"
}
"""
},
%{
timestamp: 1_690_197_172_000,
value: """
{
"type": "my_type",
"varB": "b"
}
"""
},
%{
timestamp: 1_690_197_173_000,
value: """
{
"type": "yawn"
}
"""
},
%{
timestamp: 1_690_197_174_000,
value: """
{
"type": "bad"
}
"""
},
%{
timestamp: 1_690_197_174_000,
value: """
not a JSON
"""
}
]
AltworxBook.run_pipeline(messages, pipeline)
connect-to-an-altworx-node
Connect to an Altworx node
Run altworx in developer mode via make
in the Altworx repository.
This livebook should be in the same network. That is, if run from docker, it should be started with the parameter -net host
.
list-topics
List topics
List all topics available on the connected Altworx node.
AltworxBook.connect_to_altworx()
AltworxBook.list_raw_topics()
run-pipeline-on-an-existing-topic
Run pipeline on an existing topic
Now run the pipeline on messages from an existing topic. This returns perhaps a long list of terms with messages (depending on the number of the source messages in the topic).
pipeline = [
&Normalizer.Basic.decode/1,
&Normalizer.Basic.set_random_type/1,
&Normalizer.Basic.set_timestamp/1,
&Normalizer.Basic.set_random_output/1
]
topic_name = "reality_network_updates"
since = 0
message_count = 100
msgs = AltworxBook.list_raw_messages(topic_name, since, message_count)
results = AltworxBook.run_pipeline(msgs, pipeline)
Let's put the resulting messages into tables and graphs.
AltworxBook.visualize_pipeline_results(results)
pipeline-debug
Pipeline debug
You can debug each pipeline step one by one and inspect its outputs via IO.inspect/2
or livebook's dbg/0
functions.
topic_name = "reality_network_updates"
message_offset = 0
{:ok, msg} = AltworxBook.get_raw_message(topic_name, message_offset)
msg
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.decode/1)
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.set_random_type/1)
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.set_timestamp/1)
|> AltworxBook.run_pipeline_step(&Normalizer.Basic.set_random_output/1)
|> AltworxBook.validate_pipeline_step_output_message()
|> dbg()