AltworxBook (altworx_book v0.2.0)

AltworxBook provides function to interact with Altworx from LiveBook notebooks.

There is support for:

  • Normalizer pipeline development and debugging

basic-usage

Basic usage

Currently notebook needs to be running on the same machine, as Altworx. Connecting to running Altworx is done via AltworxBook.connect_to_altworx/0. Library uses env variable ALTWORX_BOOK_ENV to infer Altworx node name, default value is prod. In case LiveBook is started in local dev environment with host docker network, env var ALTWORX_BOOK_ENV should be set to dev.

When connection is made, you can call other general purpose functions such as listing topics via AltworxBook.list_topics/0 or only raw topics via AltworxBook.list_raw_topics/0. Raw messages can be accessed by calling AltworxBook.list_raw_messages/4. A specific message can be acquired with AltworxBook.get_raw_message/2.

normalizer-pipeline-development-and-debugging

Normalizer pipeline development and debugging

Whole pipeline can be executed in a notebook using AltworxBook.run_pipeline/2 on a list of raw messages. Messages can be defined manually or retrieved from an Altworx instance. Pipeline definition is similar to the configuration in config.ini. The only difference is that normalization functions are passed as reference, e.g. Normalizer.Basic.decode->&Normalizer.Basic.decode/1).

Normalization results can be displayed as VegaLite graphs in Kino layout by calling AltworxBook.visualize_pipeline_results/1.

You can also debug pipelines by function via AltworxBook.run_pipeline_step/2 which executes a single normalization function. This is useful in combination with printing out intermediate results using IO.inspect/2 or dbg/0.

Link to this section Summary

Functions

Connect to running altworx node.

Return message from raw topic by its offset.

Returns list of messages from kafka raw topic.

List all raw topics Altworx has access to.

List all topics altworx has access to.

Execute pipeline definition on list of messages.

Evaluate pipeline step on given message.

Validate pipeline output message produced by evaluating pipeline steps one by one.

Utility function to display results of AltworxBook.run_pipeline/2.

Link to this section Functions

Link to this function

connect_to_altworx()

@spec connect_to_altworx() :: :ok | {:error, term()}

Connect to running altworx node.

Altworx node is expected to be running on the same machine as caller process.

Link to this function

get_raw_message(topic_name, offset)

@spec get_raw_message(topic_name :: String.t(), offset :: integer()) ::
  {:ok, %{timestamp: AltworxBook.Altworx.timestamp(), value: String.t()}}
  | {:error, term()}

Return message from raw topic by its offset.

examples

Examples

iex> AltworxBook.get_raw_message("reality_network_updates", 0)
[%{timestamp: 1638430188202, value: "{"timestamp": 1638430188202, "type": "upsert_asset"}"}]
Link to this function

list_raw_messages(topic_name, since, count, options \\ [])

@spec list_raw_messages(
  topic_name :: String.t(),
  since :: AltworxBook.Altworx.timestamp(),
  count :: integer(),
  options :: [:json | :just_value]
) ::
  [
    %{
      key: String.t(),
      offset: integer(),
      timestamp: AltworxBook.Altworx.timestamp(),
      value: binary() | any()
    }
  ]
  | [binary() | any()]

Returns list of messages from kafka raw topic.

args

Args

  • topic_name - name of kafka topic
  • since - timestamp in ms from which will messages be returned
  • count - count of the messages to return
  • options (optional)
    • :json - transform encoded JSON value to elixir term
    • :just_value - return only value without envelope with metadata

examples

Examples

iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1)
[%{key: "", offset: 0, timestamp: 1638430188202, value: "{"timestamp": 1638430188202, "type": "upsert_asset"}"]

iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1, [:just_value])
["{"timestamp": 1638430188202, "type": "upsert_asset"}"]

iex> AltworxBook.list_raw_messages("reality_network_updates", 0, 1, [:json])
[%{key: "", offset: 0, timestamp: 1638430188202, value: %{timestamp: 1638430188202, type: "usert_asset"}]
Link to this function

list_raw_topics()

@spec list_raw_topics() :: [String.t()]

List all raw topics Altworx has access to.

Raw topics contain raw messages received by Altworx Acceptors for Normalizer pipeline to process and transform them into Runtime messages.

examples

Examples

iex> AltworxBook.list_raw_topics()
["reality_network_updates"]
@spec list_topics() :: [String.t()]

List all topics altworx has access to.

examples

Examples

iex> AltworxBook.list_topics()
["N6_rt_reality_network_updates", "reality_network_updates"]
Link to this function

run_pipeline(messages, steps)

Execute pipeline definition on list of messages.

args

Args

  • messages - list of raw topic messages
  • steps - list of pipeline steps passed as function reference

examples

Examples

iex> messages = [%{timestamp: 1694680405146, value: "{"timestamp": 12345}"}]
... pipeline = [&Normalizer.Basic.decode/1, &Normalizer.Basic.set_random_type/1]
... AltworxBook.run_pipeline(messages, pipeline)
[{:new, %Toolbox.Message{type: "a", timestamp: 1694680405146, body: %{"timestamp" => 12345}}}]
Link to this function

run_pipeline_step(message, step_fn)

@spec run_pipeline_step(
  AltworxBook.Kafka.RawTopic.message()
  | {:error, term()}
  | :ignored
  | Toolbox.Message.t(),
  AltworxBook.Normalizer.Pipeline.pipeline_step()
) :: {:error, term()} | :ignored | Toolbox.Message.t()

Evaluate pipeline step on given message.

args

Args

  • message - raw topic message
  • step - pipeline step passed as function reference

examples

Examples

iex> message = %{timestamp: 1694680405146, value: "{"timestamp": 12345}"}
... AltworxBook.run_pipeline_step(message, &Normalizer.Basic.decode/1)
%Toolbox.Message{type: nil, timestamp: 1694680405146, body: %{"timestamp" => 12345}}
Link to this function

validate_pipeline_step_output_message(message)

@spec validate_pipeline_step_output_message(Toolbox.Message.t() | term()) ::
  :ok | {:error, term()}

Validate pipeline output message produced by evaluating pipeline steps one by one.

There is validation baked into runing whole pipeline via AltworxBook.run_pipeline/2, we need to call this function to check that all required fields have valid values.

args

Args

  • message - pipeline output message

examples

Examples

iex> message = %{timestamp: 1694680405146, value: "{"timestamp": 12345}"}
... otput_message = AltworxBook.run_pipeline_step(message, &Normalizer.Basic.decode/1)
... AltworxBook.validate_pipeline_step_output_message(output_message)
:ok
Link to this function

visualize_pipeline_results(results)

Utility function to display results of AltworxBook.run_pipeline/2.

Call this function from your notebook on outputs of AltworxBook.run_pipeline/2 and kino presentation will be displayed.

examples

Examples

iex> messages = [%{timestamp: 1694680405146, value: "{"timestamp": 12345}"}]
... pipeline = [&Normalizer.Basic.decode/1, &Normalizer.Basic.set_random_type/1]
... results = AltworxBook.run_pipeline(messages, pipeline)
... AltworxBook.visualize_pipeline_results(results)