Copyright © (C) 2019, Klarna Bank AB
This module implements interface towards pipe.
Note:It does not require startingkflow
root supervisor,
so it can be reused in other applications.
entrypoint() = fun((kflow:message() | flush) -> ok)
pipe_config() = #{id := kflow:node_id(), definition := kflow:pipe(), feed_timeout => timeout(), shutdown_timeout => timeout(), flush_interval => non_neg_integer() | undefined}
start_link/1 | Start an instance of a pipe. |
stop/1 | Gracefully stops the pipe, waiting until all in-flight messages are fully processed and stream processing nodes are terminated. |
start_link(PipeConfig::pipe_config()) -> {ok, pid(), entrypoint()} | {error, empty_pipe}
Start an instance of a pipe. Specification of a pipe is a map with the following keys:
id
: globally unique identifier of the pipe, that
should be a list of atoms. Mandatory.definition
: list of stream processing nodes that make up the
pipe. Mandatory.feed_timeout
: crash if feeding message into the pipes takes
longer than so many milliseconds. Optional, infinity
by
default.shutdown_timeout
: brutally kill the pipe if graceful shutdown
takes longer than so many milliseconds. Optional, infinity
by
default.This function returns pid of the pipe and Feed
closure that is
used to feed messages into the pipe. Messages should be
#kflow_msg
records. Example:
PipeSpec = [{filter, fun(_Offset, Msg) -> is_integer(Msg) end}, {map, fun(_Offset, Msg) -> Msg + 1 end}, {map, fun(_Offset, Msg) -> erlang:display(Msg), Msg end} ], {ok, Pid, Feed} = kflow_pipe:start(#{id := [test], definition := PipeSpec}), Feed(#kflow_msg{offset = 1, payload = foo}), Feed(#kflow_msg{offset = 2, payload = 1}), Feed(#kflow_msg{offset = 3, payload = 2}), kflow_pipe:stop(Pid).
stop(Pid::pid()) -> ok
Gracefully stops the pipe, waiting until all in-flight messages are fully processed and stream processing nodes are terminated.
Generated by EDoc