Copyright © 2019 Klarna Bank AB (publ)
This module defines the kflow_gen_filter behaviour.
Required callback functions: filter/3.
Optional callback functions: init/1, terminate/1.
This module defines a stream processing node that can filter messages. It applies a pure predicate function to each incoming message.
This behavior can be used in two modes: full and simplified. In simplified mode stream processing node is defined like following:
{filter, fun(Offset, Message) -> true | false end}
In full mode one has to create a callback module with
kflow_gen_filter behavior.
filter callback takes 3 arguments: first is offset of a message,
second is the message itself and the third one is state of the
callback module. This state is created in init callback and
remains the same through the lifetime of the pipe. Return value of
filter callback is a boolean that defines whether downstream
nodes should see the message (true) or ignore it (false).
init and terminate callbacks can be used e.g. when some
resource should be obtained to process messages. Both callbacks
are optional; configuration will be passed as is to
filter callback when init is omitted.
-module(my_filter).
-behavior(kflow_gen_filter).
-export([init/1, filter/3, terminate/1]).
init(Config) ->
State = do_init(Config),
State.
filter(Offset, Message, State) ->
true.
terminate(State) ->
do_cleanup(State).
NOTE: Since state is immutable, it's actually shared between the
routes.
callback_fun() = fun((kflow:offset(), _Message) -> boolean())
Generated by EDoc