Adap.Stream
Adap.Stream.new/3 create a stream, it takes a source enumerable, an emitter module and a chunk size.
- each element from the source is emitted and processed accross processes/nodes by
emitter.do_emit/2 - these element processing (which may take place on any node) can
- append elements to the source using :
Adap.Stream.emit/2, which will be emitted in turn - send processed element to the stream output using
Adap.Stream.done/2
- append elements to the source using :
- the streamed elements are pulled, emitted and received by
chunk_sizein order to avoid message congestion if element processing is too slow.
Let's see an example:
defmodule MyEmitter do
use Adap.Stream.Emitter
# if augment_from_local_data(elem) returns a modified elem according to local data
# and new_from_local_data(elem) returns new elements taken from local data and elem
def do_emit(sink,elem) do
Node.spawn(n1,fn->
elem = augment_from_local_data(elem)
emit(new_from_local_data(elem))
Node.spawn(n2,fn->
elem = append_local_data(elem)
done(sink,elem)
end)
end)
end
end
Adap.Stream.new(initial_elems,MyEmitter,200)
Summary↑
| done(sink, elem) |
| emit(sink, elems) |
| new(stream, emit_mod, chunk_size \\ 200) |