swirl_flow behaviour (swirl v0.2.8)

View Source

Summary

Types

flow/0

-type flow() ::
          #flow{id :: binary(),
                module :: module(),
                module_vsn :: undefined | module_vsn(),
                stream_filter :: undefined | string(),
                stream_names :: undefined | stream_names(),
                mapper_window :: undefined | pos_integer(),
                mapper_nodes :: undefined | [node()],
                mapper_opts :: mapper_opts(),
                reducer_window :: undefined | pos_integer(),
                reducer_node :: node(),
                reducer_opts :: reducer_opts(),
                reducer_skip :: undefined | boolean(),
                output_opts :: output_opts(),
                heartbeat :: undefined | pos_integer(),
                window_sync :: undefined | boolean(),
                started_at :: undefined | erlang:timestamp(),
                start_node :: node()}.

flow_opts/0

-type flow_opts() ::
          {heartbeat, pos_integer()} |
          {mapper_opts, mapper_opts()} |
          {mapper_window, pos_integer()} |
          {output_opts, output_opts()} |
          {reducer_opts, reducer_opts()} |
          {reducer_skip, boolean()} |
          {reducer_window, pos_integer()} |
          {stream_filter, string()} |
          {stream_names, stream_names()} |
          {window_sync, boolean()}.

mapper_opts/0

-type mapper_opts() :: term().

module_vsn/0

-type module_vsn() :: pos_integer().

output_opts/0

-type output_opts() :: term().

reducer_opts/0

-type reducer_opts() :: term().

stream_name/0

-type stream_name() :: atom().

stream_names/0

-type stream_names() :: [stream_name()].

Callbacks

map/3

-callback map(stream_name(), event(), mapper_opts()) -> [update()] | update() | ignore.

output/4

-callback output(flow(), period(), [row()], output_opts()) -> ok.

reduce/3

-callback reduce(flow(), row(), reducer_opts()) -> update() | ignore.

Functions

lookup(FlowId)

-spec lookup(binary() | flow()) -> undefined | flow().

register(Flow)

-spec register(flow()) -> true.

start(FlowMod, FlowOpts, MapperNodes, ReducerNode)

-spec start(atom(), [flow_opts()], [node()], node()) ->
               {ok, flow()} | {error, flow_mod_undef | {bad_flow_opts, list()}}.

stop(Flow)

-spec stop(flow()) -> ok.

unregister(Flow)

-spec unregister(flow()) -> true.