datastream/erlang/source
BEAM-only source constructors.
These bridge between Erlang’s process / timer primitives and the
cross-target Stream(a) value. The cross-target core has no
business knowing about Subject, monotonic time, or BEAM timers,
so they live here in datastream/erlang/source.
Every public function in this module is gated on
@target(erlang). On the JavaScript target the module still
compiles, so import datastream/erlang/source itself does not
fail, but calling any function fails at the call site. The
beam_only_marker constant exists solely to keep the module
non-empty on the JavaScript target.
Values
pub fn bridge_subject_stream(
over stream: datastream.Stream(a),
) -> datastream.Stream(a)
Materialise a from_subject-backed stream into a process-safe
Stream(a) so the result can be composed with par.* /
time.* / source.timeout.
The pull happens in the calling process — the same process that
owns the Subject — so the owner-only receive constraint is
respected. Once the upstream returns Done, the materialised
List(a) is reopened as a list-backed stream, which is pure and
can safely be pulled from any worker process.
Caveat: materialises the entire upstream into memory before
emitting any element. Only use this on bounded subjects (a
keep_running callback that eventually returns False, or a
finite source). For an unbounded subject, route the parallel
stage outside the subject pipeline instead — there is no
process-safe way to lazily forward an Erlang Subject across a
process boundary.
This is the official workaround for the
from_subject × par.* / time.* incompatibility documented
on from_subject, par, and source.timeout.
pub fn from_subject(
from subject: process.Subject(a),
while keep_running: fn() -> Bool,
) -> datastream.Stream(a)
Bridge a process Subject(a) into a Stream(a).
Each pull blocks on process.receive until either a message
arrives — emitted as the next element — or keep_running()
returns False between receives. The receive uses a short
internal poll interval so keep_running stays responsive.
The Subject’s lifecycle is owned by the caller: this stream
only reads, it never closes the subject. The terminal completing
is sufficient to release the stream’s hold.
Limitation: the resulting stream cannot be passed directly
to any combinator in datastream/erlang/par or
datastream/erlang/time, nor to
datastream/erlang/source.timeout — those run the pull in a
worker process and the Subject’s owner-only receive constraint
causes a panic in the worker. Use bridge_subject_stream
below to materialise the subject side in the owning process
before handing it off.
pub fn interval(every period_ms: Int) -> datastream.Stream(Nil)
Emit Nil every period_ms ms. Same first-emission timing as
ticks; differs only in element type.
pub fn ticks(every period_ms: Int) -> datastream.Stream(Int)
Emit a monotonic millisecond timestamp every period_ms ms.
The first element is emitted period_ms after the first pull.
Pair with stream.take (or any early-exit terminal) to bound
the run.
pub fn timeout(
over stream: datastream.Stream(a),
within ms: Int,
) -> datastream.Stream(Result(a, Nil))
Wrap an upstream and add a per-element deadline.
Each element of stream that arrives within ms ms surfaces as
Ok(element). If the upstream produces nothing for longer than
ms ms, exactly one Error(Nil) element is emitted and the
stream halts.
Implementation: each pull spawns an unlinked worker process that
performs the datastream.pull and forwards the step on a
dedicated subject; the parent waits with process.receive(_, within: ms). On a deadline trip the abandoned worker may still
complete the pull afterwards; its result is silently discarded
and any resource it had partially acquired is left to the BEAM
runtime to reclaim.
Downstream early-exit closes the upstream the next pull would have read from; an abandoned worker is not waited on.
Limitation: because the pull happens in a different process
than the one that built the upstream, this combinator MUST NOT
be composed with from_subject: a Subject can only be received
by its owning process, so the worker would panic. Wrap subject
streams with their own application-level timeout instead.