Module kflow_gen_assemble_chunks

This behavior helps writing code that assembles data transferred in chunks.

Copyright © 2020 Klarna Bank AB (publ)

This module defines the kflow_gen_assemble_chunks behaviour.
Required callback functions: in/6, out/2, chunk_num/1, chunk_count/1.
Optional callback functions: init/1, terminate/2.

Description

This behavior helps writing code that assembles data transferred in chunks. It is designed to handle common cases of upstream failure resulting in retransimission of chunks.

It is assumed that each upstream message contains chunk number and number of chunks in some form.

One has to create a callback module with kflow_gen_assemble_chunks behavior.

init and terminate callbacks are similar to those in kflow_gen_map or kflow_gen_filter behaviors.

Consuming upstream messages

in callback is invoked for each unique chunk of data. Parameters:

  1. Offset of a message
  2. Chunk number
  3. Total number of chunks
  4. Message itself
  5. State of the callback module. This state is created in init callback and can be mutated in the callbacks.
  6. Last argument is initial configuration (constant)

Return value is the next state.

Producing messages downstream

out callback is used to produce a message downstream. It is invoked after the last chunk has been processed. It takes two arguments: first one is current state of the callback module and the second one is initial configuration.

It should output a tuple {ok | exit, DownstreamMessage, NextState}

Example

   -module(my_aggregate).
  
   -behavior(kflow_gen_assemble_chunks).
  
   -export([init/1, chunk_num/1, chunk_count/1, in/6, out/2, terminate/1]).
  
   chunk_num(#{chunk_num := N}) -> N.
  
   chunk_count(#{chunk_count := N}) -> N.
  
   init(_Config) ->
     [].
  
   in(Offset, N, Nmax, #{value := Val}, State, Config) ->
     [Val|State].
  
   out(State, _Config) ->
     Output = lists:reverse(State),
     {exit, Output, undefined}.
  
   terminate(_State) ->
     ok.

Generated by EDoc