Copyright © 2019 Klarna Bank AB (publ)
This module defines the kflow_gen_aggregate behaviour.
Required callback functions: in/4, out/2.
Optional callback functions: init/1, terminate/2.
This module implements a stateful stream processing node for many-into-one transformations.
One has to create a callback module with kflow_gen_aggregate
behavior.
init
and terminate
callbacks are similar to those in kflow_gen_map
or kflow_gen_filter
behaviors.
in
callback is invoked for each incoming message from the
upstream. It takes 4 arguments:
init
callback and can be mutated in the callbacks.Return value should be a tuple {Flush, NextState}
where Flush
can be atoms keep
, flush
or reject
.
keep
means that the aggregator should keep collecting upstream
messages without producing anything downstream.
flush
means that the aggregator is ready to produce a message
downstream.
reject
means the last upstream message was incompatible with the
data that had been aggregated so far. (E.g. schema of the data was
different). In this case previous state is flushed and the
last message is replayed from blank state.
out
callback is used to produce a message downstream. It is
invoked when in
callback returns flush
or reject
, or when
flush is implicitly requested by low-level control logic. It takes
two arguments: first one is current state of the callback module
and the second one is initial configuration.
It should output a tuple {ok | exit, DownstreamMessage, NextState}
or an atom keep
.
Returning {ok, Msg, NextState}
will result in sending Msg
downstream, and waiting for new messages with state NextState
.
Returning {exit, Msg, NextState}
will result in sending Msg
downstream, calling terminate
callback, if it is defined by the
user CBM, and then forgetting about the state of the user CBM for
the route. This is useful when the number of routes is unlimited.
If user CBM returns keep
, then gen_aggregate will keep the state
and won't produce anything downstream. This is useful to avoid
situation when flush
is requested by some external logic, but
user CBM doesn't want to to flush half-finished data.
-module(my_aggregate). -behavior(kflow_gen_aggregate). -export([init/1, in/4, out/2, terminate/1]). init(_Config) -> []. in(Offset, Message, State, Config) -> N = maps:get(buffer_size, Config), Flush = if length(State) >= N -> flush; true -> keep end, {Flush, [Message|State]}. out(State, _Config) -> Output = lists:reverse(State), NewState = [], {ok, Output, NewState}. terminate(_State) -> ok.
Generated by EDoc