Module kflow_gen

kflow_gen is a meta-behavior that all parts of the pipe (called kfnodes from this point on), such as aggregators and demultiplexers, are built upon.

This module defines the kflow_gen behaviour.
Required callback functions: init/2, handle_message/3, handle_flush/2, terminate/2.


kflow_gen is a meta-behavior that all parts of the pipe (called kfnodes from this point on), such as aggregators and demultiplexers, are built upon. This module implements the following common scenarios:

Note: this behavior is considered internal, and normally it shouldn't be used directly

Some notes on terminology: modules that implement kflow_gen behavior from now on will be referred as "intermediate callback modules" or "intermediate CBMs". They typically define a behavior on their own; modules that implement such behavior will be called "user callback modules".

State transition diagram

Message sequence diagrams

Handle message from the upstream

Normal workflow as seen from the perspective of a kfnode:

  1. Processing of ?feed() messages is postponed until the state machine is in ready state. As soon as it enters this state, it consumes one ?feed() message from the queue. Business logic processing happens asynchronously: a temporary worker process is spawned, it executes the behavior callback handle_message/3 and the FSM enters working state in the meanwhile
  2. As soon as the callback returns a success, the worker process replies to the parent process with ?done({ok, Messages, NewCbState}) message and terminates, where Messages is a (possibly empty) list of messages that should be sent downstream and NewCbState is the updated behavior state
  3. All messages returned by the callback should be sent downstream. The FSM may enter either ready or blocked state while the downstream is processing the messages, depending on the number of messages and backpressure settings
  4. As long as the number of messages queued up to the downstream is less than max_queue_len the kfnode can process messages from the upstream in parallel.

Callback module failure

Handle upstream failure

Handle downstream failure

When the downstream dies, the kfnode receives ?downstream_failure message. From this point it can't do all that much, so it just kills its worker process (if present) and terminates.

Intermediate CBM design guidelines

Intermediate CBMs must follow certain rules.

First of all, intermediate CBMs are fully responsible for offset tracking. They should not advance offset of output messages too far to avoid losing data when the pipe restarts.

hidden flag of #kflow_msg record indicates that the message should never be passed to the user CBM: from user perspective such messages simply don't exist. However, the intermediate CBM must ensure that offsets of hidden messages are properly accounted for and committed. For example, when a pipe restarts, fully processed hidden messages should not be replayed. Intermediate CBMs can set this flag to true.

route field of #kflow_msg should not be changed, unless intermediate CBM implements some kind of pipe splitting or joining operation. In which case only the head of Route list may be changed (added or removed), the tail must be preserved. Same goes for exposing route to the user CBM: probably it's a good idea to expose only the head of the route in order to improve composability.


One of design choices behind kflow was making sure that data flows strictly in one direction: from upstream to downstream. This makes reasoning about kflow pipes easier and eliminates many types of concurrency bugs. However it also makes stream splitting and joining somewhat tricky to implement. The biggest problem is offset tracking: kfnode must guarantee that it won't advance offset of messages that it sends downstream beyond safe value. And the upstream may buffer up some messages for unknown period of time.

By default kflow framework solves this problem using the following trick. Each kfnode contains multiple states of user CBM, one state per route of upstream message. It's up to intermediate CBM to keep track of per-route states and multiplex messages between them.

Remember that route field of #kflow_msg{} is a list. Stream splitting is done simply by adding a new element to the route. Conversely, stream joining is done by removing a head of the list. Apart from that, route field has no meaning. It is only used to look up user CBM state.

Benefits of this solution:

  1. Data flows in one direction
  2. Messages are always processed in the same order, so restarting the pipe is more likely to produce the same result; good for idempotency
  3. Easier to debug. No intermediate pipes are spawned, and not much message passing goes on in general.

Downsides of this solution:

  1. All routes of the pipe are bound to the same topology. Solution: filter data that should be processed differently to separate Kafka topics and consume it from there.
  2. All routes of the pipe are processed sequentially. Solution: spread data across more Kafka partitions.

Data Types


callback_return(State) = {ret_type(), [kflow:message()], State}


ret_type() = ok | exit

Function Details


feed(Pid::pid(), MRef::reference(), Msg::kflow:message(), Timeout::timeout()) -> ok | {error, term()}

Send a message to the kfnode and block the caller until the message is processed by this node (but the subsequent processing is done asynchronously)

Note: second argument of the function is a monitor reference of the corresponding kflow_pipe process, rather than kflow_gen process! This is done to let internal fault handling logic run before crashing the caller.


flush(Server::pid()) -> ok

Command the node and its downstream to immediately flush all the buffered data (async call)


get_status(Pid::pid()) -> term()

Get various debug information about the node


notify_upstream_failure(Pid::pid() | undefined) -> ok

Nicely ask node to stop


post_init(Pid::pid(), Neighbors::{_Upstream::pid(), _Downstream::pid()}) -> ok

Tell the node pids of its neighbors


start_link(Init_data::#init_data{}) -> {ok, pid()}

Start a kfnode process

