kflow_gen
is a meta-behavior that all parts of the pipe
(called kfnodes from this point on), such as aggregators
and demultiplexers, are built upon.
Copyright © 2019 Klarna Bank AB (publ)
This module defines the kflow_gen behaviour.
Required callback functions: init/2, handle_message/3, handle_flush/2, terminate/2.
kflow_gen
is a meta-behavior that all parts of the pipe
(called kfnodes from this point on), such as aggregators
and demultiplexers, are built upon. This module implements the
following common scenarios:
initial
state)Note: this behavior is considered internal, and normally it shouldn't be used directly
Some notes on terminology: modules that implement kflow_gen
behavior from now on will be referred as "intermediate callback
modules" or "intermediate CBMs". They typically define a behavior
on their own; modules that implement such behavior will be called
"user callback modules".
Normal workflow as seen from the perspective of a kfnode:
?feed()
messages is postponed until the
state machine is in ready
state. As soon as it enters this
state, it consumes one ?feed()
message from the queue. Business
logic processing happens asynchronously: a temporary worker
process is spawned, it executes the behavior callback
handle_message/3
and the FSM enters working
state in the
meanwhile?done({ok, Messages,
NewCbState})
message and terminates, where Messages
is a
(possibly empty) list of messages that should be sent downstream
and NewCbState
is the updated behavior stateready
or blocked
state
while the downstream is processing the messages, depending on the
number of messages and backpressure settingsmax_queue_len
the kfnode can process messages from
the upstream in parallel.When the downstream dies, the kfnode receives
?downstream_failure
message. From this point it can't do all
that much, so it just kills its worker process (if present) and
terminates.
Intermediate CBMs must follow certain rules.
First of all, intermediate CBMs are fully responsible for offset tracking. They should not advance offset of output messages too far to avoid losing data when the pipe restarts.
hidden
flag of #kflow_msg
record indicates that the message
should never be passed to the user CBM: from user perspective such
messages simply don't exist. However, the intermediate CBM must
ensure that offsets of hidden messages are properly accounted for
and committed. For example, when a pipe restarts, fully processed
hidden messages should not be replayed. Intermediate CBMs can set
this flag to true
.
route
field of #kflow_msg
should not be changed, unless
intermediate CBM implements some kind of pipe splitting or joining
operation. In which case only the head of Route
list may be
changed (added or removed), the tail must be preserved. Same goes
for exposing route to the user CBM: probably it's a good idea to
expose only the head of the route in order to improve
composability.
One of design choices behind kflow was making sure that data flows strictly in one direction: from upstream to downstream. This makes reasoning about kflow pipes easier and eliminates many types of concurrency bugs. However it also makes stream splitting and joining somewhat tricky to implement. The biggest problem is offset tracking: kfnode must guarantee that it won't advance offset of messages that it sends downstream beyond safe value. And the upstream may buffer up some messages for unknown period of time.
By default kflow framework solves this problem using the following
trick. Each kfnode contains multiple states of user CBM, one state
per route
of upstream message. It's up to intermediate CBM to
keep track of per-route states and multiplex messages between
them.
Remember that route
field of #kflow_msg{}
is a list. Stream
splitting is done simply by adding a new element to the
route. Conversely, stream joining is done by removing a head of
the list. Apart from that, route
field has no meaning. It is
only used to look up user CBM state.
Benefits of this solution:
Downsides of this solution:
callback_return(State) = {ret_type(), [kflow:message()], State}
ret_type() = ok | exit
feed/4 | Send a message to the kfnode and block the caller until the message is processed by this node (but the subsequent processing is done asynchronously). |
flush/1 | Command the node and its downstream to immediately flush all the buffered data (async call). |
get_status/1 | Get various debug information about the node. |
notify_upstream_failure/1 | Nicely ask node to stop. |
post_init/2 | Tell the node pids of its neighbors. |
start_link/1 | Start a kfnode process. |
feed(Pid::pid(), MRef::reference(), Msg::kflow:message(), Timeout::timeout()) -> ok | {error, term()}
Send a message to the kfnode and block the caller until the message is processed by this node (but the subsequent processing is done asynchronously)
Note: second argument of the function is a monitor reference of the correspondingkflow_pipe
process, rather than kflow_gen
process! This is done to let internal fault handling logic run
before crashing the caller.
flush(Server::pid()) -> ok
Command the node and its downstream to immediately flush all the buffered data (async call)
get_status(Pid::pid()) -> term()
Get various debug information about the node
notify_upstream_failure(Pid::pid() | undefined) -> ok
Nicely ask node to stop
post_init(Pid::pid(), Neighbors::{_Upstream::pid(), _Downstream::pid()}) -> ok
Tell the node pids of its neighbors
start_link(Init_data::#init_data{}) -> {ok, pid()}
Start a kfnode process
Generated by EDoc