The kflow application

Kflow is a dataflow DSL similar to Kafka Streams. It was designed with Kafka in mind, but in theory it can handle other sources of data as well. Kflow implements stream processing workflows based on an abstraction called pipe.

Pipe is a list of transforms that will be applied to the input messages. Kflow pipes can be seen as a direct equivalent of UNIX pipes: data is passed between nodes of a pipe in FIFO manner. The difference is that the data is structured as Erlang terms. Data flows strictly in one direction, and Kflow messages are supposed to have an increasing integer offset. Offsets are assigned to the messages by their source (which can be Kafka, named process and what not). Kflow guarantees that the message reaches the node iff it was successfully processed by its immediate upstream neighbor at least once.

Typically offset of the message that was successfully processed by the very last kfnode is considered fully processed or safe. User-defined callbacks must be idempotent, because kflow is based on assumption that restarting the pipe at any time is safe. User-defined code must crash on error as soon as possible, so kflow can restart the pipe from the last fully processed offset and retry processing of the failing data. Although user may opt to change this behavior, in the rest of the document we disregard this possibility for simplicity.

Main benefit of this approach is ease of recovery: as long as user-defined code fails soundly instead of silently corrupting data, it's always safe to take time to fix the bug in the code and just restart the pipes. They will consume messages since the last fully processed offset and overwrite partially processed or corrupt data with correct one.

Combination of a pipe and a message source is called workflow. Kflow application launches and supervises workflows in its own supervision tree. User declares a list of workflows in an Erlang function named kflow_config:pipes/0. It should return a list with elements of type kflow:workflow/0.

Stream processing behaviors

Each node of a pipe is implemented using one of stream processing behaviors. Kflow ships with a few standard behaviors:

It is possible to implement custom stream processing nodes; see documentation for kflow_gen behavior.

NOTE: init callback of any behavior, where resource allocation takes place, is called in the context of a long-living process. So calling start_link-like functions from init callback should behave normally. Message processing callbacks, on the other hand, are called from a short-living worker process, so one should avoid linking to any processes in these callbacks.

Pipe specifications

Pipe is defined as a list of elements called nodes. Full form of node definition is a tuple that looks like this (meaning of each field will be described below):

{Behavior :: module(), NodeConfig :: kflow:node_config(), CbModule :: module(), CbConfig :: term()}

NodeConfig field is optional. If the defaults are ok, node definition can be shortened like this:

{Behavior :: module(), CbModule :: module(), CbConfig :: term()}

Moreover, some stateless stream behaviors may take an anonymous function as an argument. In this case node specification can be shortened even further:

{Behavior :: module(), fun()}

Please refer to the documentation of each behavior to see if it supports this form.

Finally, nodes that join sub-streams via kflow_join behavior can be specified just like this: join.

Behavior

Behavior field must refer one of Stream processing behaviors. It can be a full module name, or a shortcut. The following shortcuts are currently supported:

Node Config

NodeConfig field allows to tune timeouts and backpressure for the kflow nodes. The following parameters are supported:

max_queue_len allows to tune backpressure. It specifies maximal length of queue of the messages scheduled for sending to downstream, when this node stops processing messages from the upstream.

hard_timeout specifies deadline for the downstream node message processing. TODO: Currently not implemented.

CbModule

CbModule should point at an application-level callback module implementing one of the stream processing behaviors. See Application for more information about such modules.

CbConfig

CbConfig is just an arbitrary term that will be passed to CbModule callbacks.

Examples

[{filter, my_filter_module, Settings},
 {map, fun(_Offset, Msg) -> Msg + 1 end},
 {map, fun(_Offset, Msg) -> erlang:display(Msg), Msg end}
]

Workflow specifications

Unlike the rest of the configuration that is stored in the application environment, kflow obtains workflow specifications in the runtime, by calling a special function called kflow_config:pipes/0. (This is done to enable usage of full Erlang language in the workflow configuration.)

kflow_config module can be either baked into the release, or loaded dynamically in the runtime. By default the latter behavior is used: kflow attempts to find "kflow_config.erl" file in "/etc/kflow.d/" directory. Location of the workflow configuration file is controlled by config_module_dir environment variable. It can be set to atom undefined if dynamic config loading feature is not used.

kflow_config:pipes() should return a list of terms of type kflow:workflow(). This is a map with two mandatory keys: start, that specifies source of the data, and args that is a map of arguments that will be passed to the start function. There is one mandatory key in the args map: id. It should be a unique atom identifying the workflow.

Example:

-module(kflow_config).

-export([pipes/0]).

pipes() ->
  [foo()].

foo() ->
  PipeSpec =
    [{map, fun(Offset, Msg) ->
               logger:notice("Received a message: ~p, ~p", [Offset, Msg]),
               Msg
           end}
    ],
  #{ start => {kflow_kafka_consumer, start_link}
   , args  => #{ group_id  => <<"foo_group">>
               , topics    => [<<"foobar">>]
               , id        => ?FUNCTION_NAME
               , pipe_spec => PipeSpec
               }
   }.

NOTE: MFA of the workflow configuration function can be customized by setting pipes environment variable. As it should be clear from the above chapter, its default value is {kflow_config, pipes, []}.

Kafka Configuration

Kflow can start multiple brod clients; one client for each key of kafka_clients application environment variable. Configuration for each client is merged from global and client-specific settings. Consider an example sys.config:

{kflow,
 [ {kafka_clients, #{ %% This client relies on the global settings:
                      kflow_default_client => #{}
                      %% This client uses different credentials:
                    , client2 => #{kafka_sasl_file => "/etc/kflow.d/kafka2.sasl"}
                    }
   %% Global settings:
 , {kafka_sasl_file, "/etc/kflow.d/kafka1.sasl"}
 , {kafka_endpoints, [{"localhost", 9094}]}
 ]}

If kflow is not intended to be used with Kafka, kafka_clients environment variable can be set to #{}, then no brod clients will be started.

kafka_endpoints contains addresses and ports of Kafka bootstrap endpoints.

kafka_ssl and kafka_sasl sys.config flags should be pretty self-explanatory; they should be set according to the Kafka broker settings. Both flags are true by default.

kafka_sasl_file controls location of the file containing SASL credentials. Its default value is "/etc/kflow.d/kafka.sasl".

Navigating the code

Code of kflow is organized in 3 sub-directories:

  1. src/framework
  2. src/application
  3. src/workflows
  4. src/testbed

Framework

Kflow framework is a collection of modules used to implement inner workings of kflow pipes, and also intermediate behaviors.

Short guide to the framework modules (excluding behavior modules that were described above):

Application

This sub-directory contains reusable user-level behaviors and integrations towards common services (such as databases):

Workflows

This sub-directory contains "turnkey" ready-to-use workflows built from the components stored in application:

Testbed

This sub-directory contains modules and headers that aid testing of workflows and pipes.

Some remarks about performance

Intended usecase for Kflow is being a stateless middleman service collecting data from one network service, and putting it to another network service. Therefore this library tries to be as asynchronous as possible in order to minimize impact of the network latency on the throughput. Being optimized for the network comes at a cost, though. Most of the interaction within kflow happens via message passing, and therefore it requires a lot of memory copying. In order to minimize this overhead, it is not advised to use kflow_gen_map for composition of pure functions. Better fuse them in one callback invocation.

Logging

Each workflow has a dedicated log file. By default these logs are created under "/var/log/kflow/". Standard OTP logger app is used.

Log levels

Log verbosity level of pipes is configured using pipe_log_level application environment. Possible values include:

   emergency | alert | critical | error | warning | notice | info | debug

Default global value info. It can be also configured per workflow, using pipe_log_levels environment variable. It can be set to a map where keys are workflow ids and values are verbosity levels:

{kflow,
 [ {pipe_log_level, info}
 , {pipe_log_levels, #{ particularly_noisy_one => error
                      , suspicious_one => debug
                      }}
 ]}

pipe_log_formatter

This parameter should be a tuple of type {module(), logger:formatter_config()}.

The module most likely should be set to logger_formatter. Refer to http://erlang.org/doc/man/logger_formatter.html for the details.

Generated by EDoc