datastream/stream

Single-stream middle and composition combinators over Stream(a).

All combinators are lazy and order-preserving (unless documented otherwise). Building a pipeline with these combinators triggers no user callbacks; callbacks fire only when a terminal in fold or sink pulls the result.

Early-exit combinators (take, take_while) stop pulling upstream the moment their result is determined, which is what makes them safe on infinite sources.

Every combinator here forwards the upstream’s close callback so resource-backed streams (source.resource) are released on every termination path. Combinators that observe a Done from the upstream do NOT call close again — the source closed itself when it returned Done. Combinators that hold multiple upstreams open at once close them in the order specified in the spec (flat_map: inner before outer; zip: right before left).

Values

pub fn append(
  first: datastream.Stream(a),
  second: datastream.Stream(a),
) -> datastream.Stream(a)

Yield all of first, then all of second. If first is infinite, second is never opened.

Honours the spec close ordering: second is opened only after first returns Done (and so has already closed itself); on early exit before first finishes, second is never opened.

pub fn chunks_of(
  over stream: datastream.Stream(a),
  into size: Int,
) -> datastream.Stream(chunk.Chunk(a))

Group adjacent elements into fixed-size chunks.

size < 1 is normalised to 1. The trailing chunk may be smaller than size when the source length is not divisible.

pub fn concat(
  streams: List(datastream.Stream(a)),
) -> datastream.Stream(a)

Walk a List(Stream(a)) in list order, yielding every element of every stream.

Each inner stream is closed (by reaching its own Done) before the next one is opened. On early exit, the active stream is closed.

pub fn dedupe_adjacent(
  stream: datastream.Stream(a),
) -> datastream.Stream(a)

Collapse runs of ==-equal adjacent values to a single occurrence.

pub fn drop(
  from stream: datastream.Stream(a),
  up_to n: Int,
) -> datastream.Stream(a)

Discard the first n elements; n <= 0 is the identity.

pub fn drop_while(
  in stream: datastream.Stream(a),
  satisfying predicate: fn(a) -> Bool,
) -> datastream.Stream(a)

Discard the longest prefix where predicate holds, then yield the rest.

pub fn filter(
  over stream: datastream.Stream(a),
  keeping predicate: fn(a) -> Bool,
) -> datastream.Stream(a)

Keep only the elements for which predicate returns True.

Relative order of the survivors is preserved.

pub fn filter_map(
  over stream: datastream.Stream(a),
  with f: fn(a) -> option.Option(b),
) -> datastream.Stream(b)

Apply f to each element and keep only the Some(x) results.

pub fn flat_map(
  over stream: datastream.Stream(a),
  with f: fn(a) -> datastream.Stream(b),
) -> datastream.Stream(b)

Apply f to each element and concatenate the inner streams it produces.

Pulls one inner stream at a time; the next inner stream is not constructed until the previous one is exhausted. On early exit before the outer is Done, the active inner is closed first, then the outer.

pub fn flatten(
  streams: datastream.Stream(datastream.Stream(a)),
) -> datastream.Stream(a)

Flatten a stream of streams into a single stream.

Equivalent to flat_map(s, fn(x) { x }); the close ordering is the same.

pub fn group_adjacent(
  over stream: datastream.Stream(a),
  by key: fn(a) -> k,
) -> datastream.Stream(#(k, chunk.Chunk(a)))

Group consecutive elements that share key(element).

pub fn intersperse(
  over stream: datastream.Stream(a),
  with separator: a,
) -> datastream.Stream(a)

Insert separator between adjacent elements.

Empty and single-element streams are unchanged.

pub fn map(
  over stream: datastream.Stream(a),
  with f: fn(a) -> b,
) -> datastream.Stream(b)

Apply f to every element. Cardinality and order are preserved.

pub fn map_accum(
  over stream: datastream.Stream(a),
  from initial: state,
  with step: fn(state, a) -> #(state, b),
) -> datastream.Stream(b)

Thread a state through the stream while emitting elements of a possibly different type.

pub fn scan(
  over stream: datastream.Stream(a),
  from initial: b,
  with step: fn(b, a) -> b,
) -> datastream.Stream(b)

Left-fold the stream, emitting one output per input.

The seed itself is NOT emitted, so output cardinality equals input cardinality. On empty input the output is empty.

pub fn take(
  from stream: datastream.Stream(a),
  up_to n: Int,
) -> datastream.Stream(a)

Yield at most the first n elements; n <= 0 yields the empty stream.

Stops pulling upstream the moment the nth element has been emitted, and closes the upstream on that early exit. This is what makes take safe on infinite resource-backed sources.

pub fn take_while(
  in stream: datastream.Stream(a),
  satisfying predicate: fn(a) -> Bool,
) -> datastream.Stream(a)

Yield the longest prefix where predicate holds, then stop.

Stops pulling upstream and closes it as soon as predicate returns False, so take_while terminates on infinite resource-backed sources whose prefix eventually fails the predicate.

pub fn tap(
  over stream: datastream.Stream(a),
  with effect: fn(a) -> Nil,
) -> datastream.Stream(a)

Call effect once per element and re-emit the element unchanged.

pub fn zip(
  left: datastream.Stream(a),
  right: datastream.Stream(b),
) -> datastream.Stream(#(a, b))

Pair-wise zip two streams; halts the moment either source halts.

Both upstreams may be open at once. On halt, closes right first, then left, matching the spec.

pub fn zip_with(
  left: datastream.Stream(a),
  right: datastream.Stream(b),
  with combiner: fn(a, b) -> c,
) -> datastream.Stream(c)

Combine two streams element-wise with combiner; halts the moment either source halts.

Same close ordering as zip: right then left.

Search Document