datastream/erlang/time

BEAM-only time-based combinators.

Every combinator here observes wall-clock behaviour: bursts get debounced / throttled / sampled / windowed against BEAM monotonic time, never against the wall clock that can jump backwards.

Implementation strategy: each combinator spawns one worker process that pumps the upstream into a shared subject. The main process drives the consuming Stream and uses process.receive(within: ms) to wait simultaneously for the next element and for a timer deadline.

Every public function in this module is gated on @target(erlang). On the JavaScript target the module still compiles, so import datastream/erlang/time itself does not fail, but calling any function fails at the call site. The beam_only_marker constant exists solely to keep the module non-empty on the JavaScript target.

Limitation: incompatible with from_subject

Because every combinator pulls inside a spawned worker process, streams built from datastream/erlang/source.from_subject cannot be passed through this layer — an Erlang Subject can only be received by its owning process and the worker would panic on the first pull. The same constraint applies to datastream/erlang/par.

Values

pub fn debounce(
  over stream: datastream.Stream(a),
  quiet_for ms: Int,
) -> datastream.Stream(a)

Emit only after the upstream has been silent for at least quiet_for ms following the last seen element. The element emitted is the LAST element seen before the silence.

Useful for collapsing UI events (typing, resize) into a single “settled” notification.

pub fn rate_limit(
  over stream: datastream.Stream(a),
  max_per_window count: Int,
  window_ms ms: Int,
) -> datastream.Stream(a)

Emit elements no faster than count per ms-wide window. Excess elements are delayed (sleeping the calling pull), NOT dropped. Loss-free.

pub fn sample(
  over stream: datastream.Stream(a),
  every ms: Int,
) -> datastream.Stream(a)

At each ms-wide window boundary, emit the most recent element seen during the window. If no element arrived during the window, the boundary emits nothing and the next window begins.

pub fn throttle(
  over stream: datastream.Stream(a),
  every ms: Int,
) -> datastream.Stream(a)

Emit the FIRST element of each ms-wide window; drop the rest.

Windows start at the moment the first element is emitted.

pub fn window_time(
  over stream: datastream.Stream(a),
  span ms: Int,
) -> datastream.Stream(chunk.Chunk(a))

Divide time into ms-wide windows. At each boundary, emit a Chunk(a) containing every element that arrived during that window. Empty windows emit empty chunks.

Search Document