Adap.Piper behaviour

Piper proposes an implementation of Adap.Stream.Emitter where the distributed processing of each element is defined as a succession of matching rules.

Each rule can use external data to process the element or emit new ones. When external data is needed, a process is spawned on the node containing it, will receive the element and continue to apply rules.

The principle is to make each element hop from node to node in order to be processed using the locally present data.

The element will go to the stream sink when no more rule matches.

The Adap.Stream stream data by chunk, so that the construction of the external state server can take as much time as necessary without congestion: never more than the chunk size number of elements will be queued.

Let's see a processing pipe example:

This can be implemented using:

iex> Adap.Piper.defpipe ColorPipe, [{ColorPipe.Rules,[]}]
iex> defmodule JSONMap do
iex>   use Adap.StateServer, ttl: 1_000
iex>   def init(mapping) do
iex>     {:ok,File.read!("/#{mapping}.json") |> JSON.decode!}
iex>   end
iex>   def node("color") do :"user@jsonserver1" end
iex>   def node("size") do :"user@jsonserver2" end
iex> end
iex> defmodule ColorPipe.Rules do
iex>   use Adap.Piper, for: :product
iex>   defrule map_color(%{color: color}=prod,_) do
iex>     {JSONMap,"color"},color_map->
iex>       %{prod| color: color_map[color]}
iex>   end
iex>   defrule map_size(%{size: size}=prod,_) do
iex>     {JSONMap,"size"},size_map->
iex>       %{prod| size: size_map[size]}
iex>   end
iex>   defrule red_is_deleted(%{color: "red"}=prod,_) do
iex>     Dict.put(prod,:deleted,true)
iex>   end
iex> end
iex> [
iex>   {:product,%{gender: "male", category: "ipad"}},
iex>   {:product,%{color: "carmine", category: "shirt"}},
iex>   {:product,%{color: "periwinkle", size: "xxl"}}
iex> ] |> Adap.Stream.new(ColorPipe) |> Enum.to_list
[{:product,%{gender: "male", category: "ipad"}},
 {:product,%{color: "red", category: "shirt", deleted: true}},
 {:product,%{color: "blue", size: "large"}}]
Source

Summary

defpipe(alias, pipers)
defrule(sig, blocks)

Macros

defpipe(alias, pipers)
Source
defrule(sig, blocks)
Source

Callbacks

init/2

Specs:

  • init(elem :: term, args :: term) :: {elem :: term, pipe_state :: term}
Source