datastream/erlang/time
BEAM-only time-based combinators.
Every combinator here observes wall-clock behaviour: bursts get debounced / throttled / sampled / windowed against BEAM monotonic time, never against the wall clock that can jump backwards.
Implementation strategy: each combinator spawns one worker
process that pumps the upstream into a shared subject. The main
process drives the consuming Stream and uses
process.receive(within: ms) to wait simultaneously for the next
element and for a timer deadline.
Every public function in this module is gated on
@target(erlang). On the JavaScript target the module still
compiles, so import datastream/erlang/time itself does not fail,
but calling any function fails at the call site. The
beam_only_marker constant exists solely to keep the module
non-empty on the JavaScript target.
Limitation: incompatible with from_subject
Because every combinator pulls inside a spawned worker process,
streams built from datastream/erlang/source.from_subject cannot
be passed through this layer — an Erlang Subject can only be
received by its owning process and the worker would panic on
the first pull. The same constraint applies to
datastream/erlang/par.
Values
pub fn debounce(
over stream: datastream.Stream(a),
quiet_for ms: Int,
) -> datastream.Stream(a)
Emit only after the upstream has been silent for at least
quiet_for ms following the last seen element. The element
emitted is the LAST element seen before the silence.
Useful for collapsing UI events (typing, resize) into a single “settled” notification.
pub fn rate_limit(
over stream: datastream.Stream(a),
max_per_window count: Int,
window_ms ms: Int,
) -> datastream.Stream(a)
Emit elements no faster than count per ms-wide window.
Excess elements are delayed (sleeping the calling pull), NOT
dropped. Loss-free.
pub fn sample(
over stream: datastream.Stream(a),
every ms: Int,
) -> datastream.Stream(a)
At each ms-wide window boundary, emit the most recent element
seen during the window. If no element arrived during the window,
the boundary emits nothing and the next window begins.
pub fn throttle(
over stream: datastream.Stream(a),
every ms: Int,
) -> datastream.Stream(a)
Emit the FIRST element of each ms-wide window; drop the rest.
Windows start at the moment the first element is emitted.
pub fn window_time(
over stream: datastream.Stream(a),
span ms: Int,
) -> datastream.Stream(chunk.Chunk(a))
Divide time into ms-wide windows. At each boundary, emit a
Chunk(a) containing every element that arrived during that
window. Empty windows emit empty chunks.