Module kflow_pipe

This module implements interface towards pipe.

Copyright © (C) 2019, Klarna Bank AB

Description

This module implements interface towards pipe.

Note:It does not require starting kflow root supervisor, so it can be reused in other applications.

Data Types

entrypoint()

entrypoint() = fun((kflow:message() | flush) -> ok)

pipe_config()

pipe_config() = #{id := kflow:node_id(), definition := kflow:pipe(), feed_timeout => timeout(), shutdown_timeout => timeout(), flush_interval => non_neg_integer() | undefined}

Function Index

start_link/1Start an instance of a pipe.
stop/1Gracefully stops the pipe, waiting until all in-flight messages are fully processed and stream processing nodes are terminated.

Function Details

start_link/1

start_link(PipeConfig::pipe_config()) -> {ok, pid(), entrypoint()} | {error, empty_pipe}

Start an instance of a pipe. Specification of a pipe is a map with the following keys:

This function returns pid of the pipe and Feed closure that is used to feed messages into the pipe. Messages should be #kflow_msg records. Example:

  PipeSpec = [{filter, fun(_Offset, Msg) -> is_integer(Msg) end},
              {map, fun(_Offset, Msg) -> Msg + 1 end},
              {map, fun(_Offset, Msg) -> erlang:display(Msg), Msg end}
             ],
  {ok, Pid, Feed} = kflow_pipe:start(#{id := [test], definition := PipeSpec}),
  Feed(#kflow_msg{offset = 1, payload = foo}),
  Feed(#kflow_msg{offset = 2, payload = 1}),
  Feed(#kflow_msg{offset = 3, payload = 2}),
  kflow_pipe:stop(Pid).

stop/1

stop(Pid::pid()) -> ok

Gracefully stops the pipe, waiting until all in-flight messages are fully processed and stream processing nodes are terminated.


Generated by EDoc