Kflow is a dataflow DSL similar to Kafka Streams. It was designed with Kafka in mind, but in theory it can handle other sources of data as well. Kflow implements stream processing workflows based on an abstraction called pipe.
Pipe is a list of transforms that will be applied to the input messages. Kflow pipes can be seen as a direct equivalent of UNIX pipes: data is passed between nodes of a pipe in FIFO manner. The difference is that the data is structured as Erlang terms. Data flows strictly in one direction, and Kflow messages are supposed to have an increasing integer offset. Offsets are assigned to the messages by their source (which can be Kafka, named process and what not). Kflow guarantees that the message reaches the node iff it was successfully processed by its immediate upstream neighbor at least once.
Typically offset of the message that was successfully processed by the very last kfnode is considered fully processed or safe. User-defined callbacks must be idempotent, because kflow is based on assumption that restarting the pipe at any time is safe. User-defined code must crash on error as soon as possible, so kflow can restart the pipe from the last fully processed offset and retry processing of the failing data. Although user may opt to change this behavior, in the rest of the document we disregard this possibility for simplicity.
Main benefit of this approach is ease of recovery: as long as user-defined code fails soundly instead of silently corrupting data, it's always safe to take time to fix the bug in the code and just restart the pipes. They will consume messages since the last fully processed offset and overwrite partially processed or corrupt data with correct one.
Combination of a pipe and a message source is called workflow.
Kflow application launches and supervises workflows in its own
supervision tree. User declares a list of workflows in an Erlang
function named kflow_config:pipes/0
. It should return a list with
elements of type kflow:workflow/0
.
Each node of a pipe is implemented using one of stream processing behaviors. Kflow ships with a few standard behaviors:
kflow_gen_map
applies a function to every message: "one
in, one out"kflow_gen_filter
runs a predicate for each message,
potentially removing messages from the stream: "one it, one or zero
out"kflow_gen_aggregate
is used to accumulate messages:
"multiple in, one out"kflow_gen_assemble_chunks
is a special case of
kflow_gen_aggregate that is used to assemble data transferred in
chunks. "multiple in, one out"kflow_gen_unfold
is the opposite of aggregate
: "one in,
multiple out"kflow_gen_demux
splits messages to multiple substreams,
that then will be processed independentlykflow_gen_mfd
is a combination of map, filter and
demuxkflow_join
joins substreamskflow_route_dependent
allows to run different logic in
different substreamsIt is possible to implement custom stream processing nodes; see
documentation for kflow_gen
behavior.
NOTE: init
callback of any behavior, where resource
allocation takes place, is called in the context of a long-living
process. So calling start_link
-like functions from init
callback
should behave normally. Message processing callbacks, on the other
hand, are called from a short-living worker process, so one should
avoid linking to any processes in these callbacks.
Pipe is defined as a list of elements called nodes. Full form of node definition is a tuple that looks like this (meaning of each field will be described below):
{Behavior :: module(), NodeConfig :: kflow:node_config(), CbModule :: module(), CbConfig :: term()}
NodeConfig
field is optional. If the defaults are ok, node
definition can be shortened like this:
{Behavior :: module(), CbModule :: module(), CbConfig :: term()}
Moreover, some stateless stream behaviors may take an anonymous function as an argument. In this case node specification can be shortened even further:
{Behavior :: module(), fun()}
Please refer to the documentation of each behavior to see if it supports this form.
Finally, nodes that join sub-streams via kflow_join
behavior can be
specified just like this: join
.
Behavior
field must refer one of Stream processing
behaviors. It can be a full module name, or a shortcut. The following
shortcuts are currently supported:
map
: kflow_gen_map
filter
: kflow_gen_filter
aggregate
: kflow_gen_aggregate
unfold
: kflow_gen_unfold
demux
: kflow_gen_demux
NodeConfig
field allows to tune timeouts and backpressure for the
kflow nodes. The following parameters are supported:
max_queue_len
allows to tune backpressure. It specifies maximal
length of queue of the messages scheduled for sending to downstream,
when this node stops processing messages from the upstream.
hard_timeout
specifies deadline for the downstream node message
processing. TODO: Currently not implemented.
CbModule
should point at an application-level callback module
implementing one of the stream processing behaviors. See Application for more information about such modules.
CbConfig
is just an arbitrary term that will be passed to
CbModule
callbacks.
[{filter, my_filter_module, Settings}, {map, fun(_Offset, Msg) -> Msg + 1 end}, {map, fun(_Offset, Msg) -> erlang:display(Msg), Msg end} ]
Unlike the rest of the configuration that is stored in the application
environment, kflow obtains workflow specifications in the runtime, by
calling a special function called kflow_config:pipes/0
. (This is
done to enable usage of full Erlang language in the workflow
configuration.)
kflow_config
module can be either baked into the release, or loaded
dynamically in the runtime. By default the latter behavior is used:
kflow attempts to find "kflow_config.erl" file in "/etc/kflow.d/"
directory. Location of the workflow configuration file is controlled
by config_module_dir
environment variable. It can be set to atom
undefined
if dynamic config loading feature is not used.
kflow_config:pipes()
should return a list of terms of type
kflow:workflow()
. This is a map with two mandatory keys: start
,
that specifies source of the data, and args
that is a map of
arguments that will be passed to the start function. There is one
mandatory key in the args
map: id
. It should be a unique atom
identifying the workflow.
Example:
-module(kflow_config). -export([pipes/0]). pipes() -> [foo()]. foo() -> PipeSpec = [{map, fun(Offset, Msg) -> logger:notice("Received a message: ~p, ~p", [Offset, Msg]), Msg end} ], #{ start => {kflow_kafka_consumer, start_link} , args => #{ group_id => <<"foo_group">> , topics => [<<"foobar">>] , id => ?FUNCTION_NAME , pipe_spec => PipeSpec } }.
NOTE: MFA of the workflow configuration function can be customized by
setting pipes
environment variable. As it should be clear from the
above chapter, its default value is {kflow_config, pipes, []}
.
Kflow can start multiple brod clients; one client for each key of
kafka_clients
application environment variable. Configuration for
each client is merged from global and client-specific
settings. Consider an example sys.config:
{kflow, [ {kafka_clients, #{ %% This client relies on the global settings: kflow_default_client => #{} %% This client uses different credentials: , client2 => #{kafka_sasl_file => "/etc/kflow.d/kafka2.sasl"} } %% Global settings: , {kafka_sasl_file, "/etc/kflow.d/kafka1.sasl"} , {kafka_endpoints, [{"localhost", 9094}]} ]}
If kflow is not intended to be used with Kafka, kafka_clients
environment variable can be set to #{}
, then no brod clients will be
started.
kafka_endpoints
contains addresses and ports of Kafka bootstrap
endpoints.
kafka_ssl
and kafka_sasl
sys.config flags should be pretty
self-explanatory; they should be set according to the Kafka broker
settings. Both flags are true
by default.
kafka_sasl_file
controls location of the file containing SASL
credentials. Its default value is "/etc/kflow.d/kafka.sasl".
Code of kflow is organized in 3 sub-directories:
Kflow framework is a collection of modules used to implement inner workings of kflow pipes, and also intermediate behaviors.
Short guide to the framework modules (excluding behavior modules that were described above):
kflow_gen
implements pipe node state machinekflow_pipe
implements process that ties multiple stream
processing nodes togetherkflow_lib
contains various utility functionskflow_multistate
helper module used to implement
substreamskflow_sup
root supervisor of kflow applicationkflow_pipe_sup
supervisor for the user-defined workflowskflow_kafka_consumer
source type that reads messages from
Apache Kafka. Partitions are assigned via consumer group protocolkflow_kafka_commit
commits fully processed offsets to
KafkaThis sub-directory contains reusable user-level behaviors and integrations towards common services (such as databases):
kflow_produce_to_kafka
: produce message sets to Kafkakflow_group_kafka_messages
: helps buffering Kafka messageskflow_chunks_to_file
: assemble objects transferred in
chunks and store contents in fileskflow_chunks_to_file
: assemble objects transferred in
chunks and store contents in S3kflow_postgres
: store messages in a postgres tableThis sub-directory contains "turnkey" ready-to-use workflows built
from the components stored in application
:
kflow_kafka_retransmit
: consume messages from one topic
and retransmit to another, potentially scattering messages over
multiple partitionskflow_wf_chunks_to_local_file
: store contents of Kafka
messages in fileskflow_wf_chunks_to_s3
: store contents of Kafka messages as
S3 objectsThis sub-directory contains modules and headers that aid testing of workflows and pipes.
Intended usecase for Kflow is being a stateless middleman service
collecting data from one network service, and putting it to another
network service. Therefore this library tries to be as asynchronous as
possible in order to minimize impact of the network latency on the
throughput. Being optimized for the network comes at a cost,
though. Most of the interaction within kflow happens via message
passing, and therefore it requires a lot of memory copying. In order
to minimize this overhead, it is not advised to use kflow_gen_map
for composition of pure functions. Better fuse them in one callback
invocation.
Each workflow has a dedicated log file. By default these logs are created under "/var/log/kflow/". Standard OTP logger app is used.
Log verbosity level of pipes is configured using pipe_log_level
application environment. Possible values include:
emergency | alert | critical | error | warning | notice | info | debug
Default global value info
. It can be also configured per workflow,
using pipe_log_levels
environment variable. It can be set to a map
where keys are workflow ids and values are verbosity levels:
{kflow, [ {pipe_log_level, info} , {pipe_log_levels, #{ particularly_noisy_one => error , suspicious_one => debug }} ]}
This parameter should be a tuple of type {module(), logger:formatter_config()}
.
logger_formatter
. Refer to
http://erlang.org/doc/man/logger_formatter.html for the details.
Generated by EDoc