Module kflow_gen_mfd

This module defines a stateless stream processing node that combines kflow_gen_map, kflow_gen_filter and kflow_gen_demux.

Copyright © 2019 Klarna Bank AB (publ)

This module defines the kflow_gen_mfd behaviour.
Required callback functions: mfd/3.
Optional callback functions: init/1, terminate/1.

Description

This module defines a stateless stream processing node that combines kflow_gen_map, kflow_gen_filter and kflow_gen_demux.

This behavior can be used in two modes: full and simplified. In simplified mode stream processing node is defined like following:

      {mfd, fun(Offset, Message) -> {true, Message} | {true, Route, Message} | false end}

In full mode one has to create a callback module with kflow_gen_mfd behavior.

mfd callback takes 3 arguments: first is offset of a message, second is the message itself and the third one is state of the callback module. This state is created in init callback and remains the same through the lifetime of the pipe. Return value of mfd callback should be of type return_type/0.

init and terminate callbacks can be used e.g. when some resource should be obtained to process messages. Both callbacks are optional; configuration will be passed as is to filter callback when init is omitted.

Example

   -module(my_mfd).
  
   -behavior(kflow_gen_mfd).
  
   -export([init/1, filtermap/3, terminate/1]).
  
   init(Config) ->
     State = do_init(Config),
     State.
  
   mfd(Offset, Message, State) ->
     %% Apply `transform' to the message and pass it downstream:
     {true, transform(Message)};
   mfd(Offset, Message, State) ->
     %% Apply `transform' to the message and pass it to a substream `Route':
     {true, Route, transform(Message)};
   mfd(Offset, Message, State) ->
     %% Drop the message:
     false.
  
   terminate(State) ->
     do_cleanup(State).
NOTE: Since state is immutable, it's actually shared between the routes.

Data Types

callback_fun()

callback_fun() = fun((kflow:offset(), _Message) -> return_type())

return_type()

return_type() = {true, Ret::term()} | {true, Route::term(), Ret::term()} | false


Generated by EDoc