Module kflow_gen_unfold

This behavior can be seen as the opposite of kflow_gen_aggregate; it implements a stream processing node that applies a pure function to each incoming message.

Copyright © 2019 Klarna Bank AB (publ)

This module defines the kflow_gen_unfold behaviour.
Required callback functions: unfold/3.
Optional callback functions: init/1, terminate/1.

Description

This behavior can be seen as the opposite of kflow_gen_aggregate; it implements a stream processing node that applies a pure function to each incoming message. This function produces a list of messages that should be sent downstream.

This behavior can be used in two modes: full and simplified. In simplified mode stream processing node is defined like following:

      {unfold, fun(Offset, Message) -> [Message, Message] end}

In full mode one has to create a callback module with kflow_gen_unfold behavior.

unfold callback takes 3 arguments: first is offset of a message, second is the message itself and the third one is state of the callback module. This state is created in init callback and remains the same through the lifetime of the pipe. Return value of unfold callback is a list of messages, each one is passed downstream.

init and terminate callbacks can be used e.g. when some resource should be obtained to process messages. Both callbacks are optional; configuration will be passed as is to map callback when init is omitted.

Example

   -module(extract_writes).
  
   -behavior(kflow_gen_unfold).
  
   -export([init/1, unfold/3, terminate/1]).
  
   init(Config) ->
     State = do_init(Config),
     State.
  
   unfold(Offset, #tx{writes = Writes}, State) ->
     Writes.
  
   terminate(State) ->
     do_cleanup(State).
NOTE: Since state is immutable, it's actually shared between the routes.

Data Types

callback_fun()

callback_fun() = fun((kflow:offset(), _InputMessage) -> [_OutputMessage])


Generated by EDoc