Adap.Stream

Adap.Stream.new/3 create a stream, it takes a source enumerable, an emitter module and a chunk size.

Let's see an example:

defmodule MyEmitter do
  use Adap.Stream.Emitter
  # if augment_from_local_data(elem) returns a modified elem according to local data
  # and new_from_local_data(elem) returns new elements taken from local data and elem
  def do_emit(sink,elem) do
    Node.spawn(n1,fn->
      elem = augment_from_local_data(elem)
      emit(new_from_local_data(elem))
      Node.spawn(n2,fn->
        elem = append_local_data(elem)
        done(sink,elem)
      end)
    end)
  end
end
Adap.Stream.new(initial_elems,MyEmitter,200)
Source

Summary

done(sink, elem)
emit(sink, elems)
new(stream, emit_mod, chunk_size \\ 200)

Functions

done(sink, elem)
Source
emit(sink, elems)
Source
new(stream, emit_mod, chunk_size \\ 200)
Source