AltworxBook (altworx_book v0.2.0)
AltworxBook provides function to interact with Altworx from LiveBook notebooks.
There is support for:
- Normalizer pipeline development and debugging
basic-usage
Basic usage
Currently notebook needs to be running on the same machine, as Altworx. Connecting to
running Altworx is done via AltworxBook.connect_to_altworx/0
. Library uses env variable
ALTWORX_BOOK_ENV
to infer Altworx node name, default value is prod
. In case LiveBook
is started in local dev environment with host docker network, env var ALTWORX_BOOK_ENV
should be set to dev
.
When connection is made, you can call other general purpose functions such as listing topics
via AltworxBook.list_topics/0
or only raw topics via AltworxBook.list_raw_topics/0
.
Raw messages can be accessed by calling AltworxBook.list_raw_messages/4
. A specific
message can be acquired with AltworxBook.get_raw_message/2
.
normalizer-pipeline-development-and-debugging
Normalizer pipeline development and debugging
Whole pipeline can be executed in a notebook using AltworxBook.run_pipeline/2
on a list of raw
messages. Messages can be defined manually or retrieved from an Altworx instance. Pipeline
definition is similar to the configuration in config.ini
. The only difference is that
normalization functions are passed as reference, e.g.
Normalizer.Basic.decode
->&Normalizer.Basic.decode/1
).
Normalization results can be displayed as VegaLite graphs in Kino layout by calling
AltworxBook.visualize_pipeline_results/1
.
You can also debug pipelines by function via AltworxBook.run_pipeline_step/2
which
executes a single normalization function. This is useful in combination with printing out
intermediate results using IO.inspect/2
or dbg/0
.
Link to this section Summary
Functions
Connect to running altworx node.
Return message from raw topic by its offset
.
Returns list of messages from kafka raw topic.
List all raw topics Altworx has access to.
List all topics altworx has access to.
Execute pipeline definition on list of messages.
Evaluate pipeline step on given message.
Validate pipeline output message produced by evaluating pipeline steps one by one.
Utility function to display results of AltworxBook.run_pipeline/2
.
Link to this section Functions
connect_to_altworx()
@spec connect_to_altworx() :: :ok | {:error, term()}
Connect to running altworx node.
Altworx node is expected to be running on the same machine as caller process.
get_raw_message(topic_name, offset)
@spec get_raw_message(topic_name :: String.t(), offset :: integer()) :: {:ok, %{timestamp: AltworxBook.Altworx.timestamp(), value: String.t()}} | {:error, term()}
Return message from raw topic by its offset
.
examples
Examples
iex> AltworxBook.get_raw_message("reality_network_updates", 0)
[%{timestamp: 1638430188202, value: "{"timestamp": 1638430188202, "type": "upsert_asset"}"}]
list_raw_messages(topic_name, since, count, options \\ [])
@spec list_raw_messages( topic_name :: String.t(), since :: AltworxBook.Altworx.timestamp(), count :: integer(), options :: [:json | :just_value] ) :: [ %{ key: String.t(), offset: integer(), timestamp: AltworxBook.Altworx.timestamp(), value: binary() | any() } ] | [binary() | any()]
Returns list of messages from kafka raw topic.
args
Args
topic_name
- name of kafka topicsince
- timestamp in ms from which will messages be returnedcount
- count of the messages to returnoptions
(optional):json
- transform encoded JSON value to elixir term:just_value
- return only value without envelope with metadata
examples
Examples
iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1)
[%{key: "", offset: 0, timestamp: 1638430188202, value: "{"timestamp": 1638430188202, "type": "upsert_asset"}"]
iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1, [:just_value])
["{"timestamp": 1638430188202, "type": "upsert_asset"}"]
iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1, [:json])
[%{key: "", offset: 0, timestamp: 1638430188202, value: %{timestamp: 1638430188202, type: "usert_asset"}]
list_raw_topics()
@spec list_raw_topics() :: [String.t()]
List all raw topics Altworx has access to.
Raw topics contain raw messages received by Altworx Acceptors for Normalizer pipeline to process and transform them into Runtime messages.
examples
Examples
iex> AltworxBook.list_raw_topics()
["reality_network_updates"]
list_topics()
@spec list_topics() :: [String.t()]
List all topics altworx has access to.
examples
Examples
iex> AltworxBook.list_topics()
["N6_rt_reality_network_updates", "reality_network_updates"]
run_pipeline(messages, steps)
@spec run_pipeline([AltworxBook.Kafka.RawTopic.message()], [ AltworxBook.Normalizer.Pipeline.pipeline_step() ]) :: [AltworxBook.Normalizer.Pipeline.pipeline_step_result()]
Execute pipeline definition on list of messages.
args
Args
messages
- list of raw topic messagessteps
- list of pipeline steps passed as function reference
examples
Examples
iex> messages = [%{timestamp: 1694680405146, value: "{"timestamp": 12345}"}]
... pipeline = [&Normalizer.Basic.decode/1, &Normalizer.Basic.set_random_type/1]
... AltworxBook.run_pipeline(messages, pipeline)
[{:new, %Toolbox.Message{type: "a", timestamp: 1694680405146, body: %{"timestamp" => 12345}}}]
run_pipeline_step(message, step_fn)
@spec run_pipeline_step( AltworxBook.Kafka.RawTopic.message() | {:error, term()} | :ignored | Toolbox.Message.t(), AltworxBook.Normalizer.Pipeline.pipeline_step() ) :: {:error, term()} | :ignored | Toolbox.Message.t()
Evaluate pipeline step on given message.
args
Args
message
- raw topic messagestep
- pipeline step passed as function reference
examples
Examples
iex> message = %{timestamp: 1694680405146, value: "{"timestamp": 12345}"}
... AltworxBook.run_pipeline_step(message, &Normalizer.Basic.decode/1)
%Toolbox.Message{type: nil, timestamp: 1694680405146, body: %{"timestamp" => 12345}}
validate_pipeline_step_output_message(message)
@spec validate_pipeline_step_output_message(Toolbox.Message.t() | term()) :: :ok | {:error, term()}
Validate pipeline output message produced by evaluating pipeline steps one by one.
There is validation baked into runing whole pipeline via AltworxBook.run_pipeline/2
,
we need to call this function to check that all required fields have valid values.
args
Args
message
- pipeline output message
examples
Examples
iex> message = %{timestamp: 1694680405146, value: "{"timestamp": 12345}"}
... otput_message = AltworxBook.run_pipeline_step(message, &Normalizer.Basic.decode/1)
... AltworxBook.validate_pipeline_step_output_message(output_message)
:ok
visualize_pipeline_results(results)
Utility function to display results of AltworxBook.run_pipeline/2
.
Call this function from your notebook on outputs of AltworxBook.run_pipeline/2
and
kino presentation will be displayed.
examples
Examples
iex> messages = [%{timestamp: 1694680405146, value: "{"timestamp": 12345}"}]
... pipeline = [&Normalizer.Basic.decode/1, &Normalizer.Basic.set_random_type/1]
... results = AltworxBook.run_pipeline(messages, pipeline)
... AltworxBook.visualize_pipeline_results(results)