datastream
datastream is a pull-based stream library for Gleam.
A Stream(a) is a pipeline definition, not a materialized collection.
Each terminal operation runs the source again from the beginning, so the
library fits work that should stay lazy, repeatable, and explicit about
effects.
Install
gleam add datastream
API reference: https://hexdocs.pm/datastream
When to use it
Use datastream when you need one or more of these:
- the input is large or unbounded and should not be loaded all at once
- the pipeline owns a real resource such as a file handle, socket, or cursor
- the work is naturally chunked text or bytes
- the Erlang target needs bounded parallel work or time-based stream operators
For a hands-on tour, jump to the runnable Log-ingest example or the NDJSON example. The full catalog of compile-checked end-to-end pipelines lives under Example pipelines.
Stay with gleam/list when the whole input is already in memory and you
do not need lazy pulls, replayable pipelines, or deterministic cleanup.
Quick start
import datastream/sink
import datastream/source
import datastream/stream
import gleam/io
pub fn main() {
let result =
source.iterate(from: 1, with: fn(x) { x + 1 })
|> stream.map(with: fn(x) { x * 2 })
|> stream.take(up_to: 5)
|> sink.to_list
io.debug(result)
// [2, 4, 6, 8, 10]
}
Log-ingest example: bytes -> lines -> per-level counts
A common shape: bytes arrive in arbitrary chunks (file, socket), the
pipeline reassembles them into lines, drops malformed rows, and produces
per-level counts in a single pass. text.lines owns the line buffering,
so the chunk boundaries shown below never split a record across two
emitted strings.
import datastream/fold
import datastream/source
import datastream/stream
import datastream/text
import gleam/dict
import gleam/io
import gleam/option.{type Option, None, Some}
import gleam/string
pub type Level {
Info
Warn
LogError
}
pub fn main() {
// Real-world input arrives in arbitrary chunks. Note how `WARN` and
// `INFO` records straddle chunk boundaries — `text.lines` reassembles.
let chunks = [
"INFO user_id=42 logged in\nWARN ", "user_id=42 retry\nERROR ",
"user_id=99 timeout\nbogus line with no level\nINFO ",
"user_id=42 ok\n",
]
source.from_list(chunks)
|> text.lines
|> stream.filter_map(with: parse_line)
|> fold.fold(from: dict.new(), with: bump)
|> io.debug
// dict.from_list([#(Info, 2), #(Warn, 1), #(LogError, 1)])
}
fn parse_line(line: String) -> Option(Level) {
case string.split_once(line, on: " ") {
Ok(#("INFO", _)) -> Some(Info)
Ok(#("WARN", _)) -> Some(Warn)
Ok(#("ERROR", _)) -> Some(LogError)
_ -> None
}
}
fn bump(acc: dict.Dict(Level, Int), level: Level) -> dict.Dict(Level, Int) {
let current = case dict.get(acc, level) {
Ok(n) -> n
Error(Nil) -> 0
}
dict.insert(acc, level, current + 1)
}
Production callers swap source.from_list(chunks) for a
source.resource over a real file handle or socket — the rest of the
pipeline is unchanged. Full source:
test/examples/log_pipeline_example.gleam.
NDJSON example: chunked bytes -> typed records
Newline-delimited JSON (one record per line) over a chunked byte source. The same lazy pass does UTF-8 decode, line framing, and per-record parsing without holding the whole payload in memory.
import datastream/fold
import datastream/source
import datastream/stream
import datastream/text
import gleam/int
import gleam/io
import gleam/string
pub type Record {
Record(id: Int, body: String)
}
pub type ParseError {
EmptyLine
MissingId(line: String)
BadId(raw: String)
}
pub fn main() {
// The first chunk ends mid-line; the second completes that record.
let chunks =
source.from_list([
<<"1 first record\n2 second">>,
<<" record\n3 third record\n">>,
])
chunks
|> text.utf8_decode_lossy
|> text.lines
|> stream.map(with: parse_record)
|> fold.collect_result
|> io.debug
// Ok([Record(1, "first record"), Record(2, "second record"), ...])
}
fn parse_record(line: String) -> Result(Record, ParseError) {
case line {
"" -> Error(EmptyLine)
_ ->
case string.split_once(line, on: " ") {
Error(_) -> Error(MissingId(line: line))
Ok(#(raw_id, body)) ->
case int.parse(raw_id) {
Ok(id) -> Ok(Record(id: id, body: body))
Error(_) -> Error(BadId(raw: raw_id))
}
}
}
}
fold.collect_result short-circuits at the first parse failure; swap in
fold.partition_result to surface every successful record alongside
every error. Drop in your JSON decoder of choice at parse_record.
Full source:
test/examples/ndjson_pipeline_example.gleam.
Resource-backed streams
source.resource opens lazily on the first pull, yields values one by
one, and closes exactly once on normal completion and on the early-stop
paths the library controls.
import datastream.{Done, Next}
import datastream/fold
import datastream/source
import datastream/stream
import gleam/io
pub fn main() {
let numbers =
source.resource(
open: fn() { 1 },
next: fn(state) {
case state <= 3 {
True -> Next(element: state, state: state + 1)
False -> Done
}
},
close: fn(_state) { Nil },
)
numbers
|> stream.take(up_to: 2)
|> fold.to_list
|> io.debug
// [1, 2]
}
Use source.try_resource when opening or reading can fail and you want
the failure on the typed path as Result.
JavaScript async I/O
The core Stream(a) stays synchronous on both targets. That is a
deliberate design choice: each pull either returns the next element
immediately or reports Done.
Because of that, the official JavaScript boundary is:
- async input stays in host JavaScript until it has been reduced to a bounded value or batch, then enters
datastreamthroughsource.once,source.from_list,source.from_bit_array, or a resource constructor - synchronous
Stream(a)pipelines leavedatastreamthroughdatastream/javascript/async.to_async_iterable
This means datastream does not pretend that a host-side
AsyncIterable is the same thing as a pure Stream(a). The adapter is
honest about where await lives.
Gleam:
import datastream/javascript/async as js_async
import datastream/source
import datastream/stream
import gleam/string
pub fn lines_for_host() -> js_async.AsyncIterable(String) {
source.from_list(["a", "b", "c"])
|> stream.map(with: string.uppercase)
|> js_async.to_async_iterable
}
Host JavaScript:
import { lines_for_host } from "./build/dev/javascript/app/app.mjs";
for await (const line of lines_for_host()) {
console.log(line);
if (line === "B") break;
}
Breaking out of the for await loop closes the underlying stream once,
so resource-backed pipelines still release handles promptly.
Example pipelines
Compile-checked examples live under test/examples/ and run in CI. The
log-ingest and NDJSON entries are inlined above; the rest are linked
straight to source.
| Example | Shape |
|---|---|
log_pipeline_example.gleam | bytes -> text.lines -> validation -> per-level counts |
ndjson_pipeline_example.gleam | bytes -> UTF-8 decode -> lines -> per-record parse |
length_prefixed_pipeline_example.gleam | chunked bytes -> binary.length_prefixed_with -> collection |
parallel_pipeline_example.gleam | BEAM-only bounded parallel map |
dataprep_pipeline_example.gleam | per-row validation with accumulated errors |
Module guide
datastream: definesStream(a)andStep(a, state)datastream/source: constructors for list-backed, generated, and resource-backed streamsdatastream/stream: lazy combinators such asmap,filter,flat_map,zip,take, andchunks_ofdatastream/sink: every terminal — pure reductions (to_list,count,fold,first,find,collect_result, …) and side-effecting consumers (each,try_each,println)datastream/fold: legacy alias for the pure-reduction subset ofsink. Re-exports the same functions for backward compatibility; new code should reach fordatastream/sinkinsteaddatastream/chunk: opaque finite batchesdatastream/text: chunk-aware UTF-8 decode and line splittingdatastream/binary: byte and framing helpersdatastream/erlang/source: BEAM-only subject, timer, and timeout helpersdatastream/erlang/sink: BEAM-only subject sinkdatastream/erlang/par: BEAM-only bounded parallel combinators andracedatastream/erlang/time: BEAM-only time-based combinatorsdatastream/javascript/async: JavaScript-only async iterable adapter for leaving the synchronous core
Target support
- Erlang target: the full package
- JavaScript target: the cross-target core and
datastream/javascript/async datastream/erlang/*modules are BEAM-only
Checked constructors
Some constructors reject invalid numeric arguments at construction time.
Use the panicking variants when the value is a trusted constant in your
own code. Use the matching *_checked variant when the value comes from
CLI flags, config files, request parameters, or any other dynamic input.
The main checked families are:
stream.take_checked,stream.drop_checkedstream.buffer_checked,stream.chunks_of_checkedbinary.length_prefixed_checked,binary.length_prefixed_with_checked,binary.fixed_size_checkeddatastream/erlang/par.*_checked
Backpressure
Two combinators interact with backpressure between a producer and one or more consumers:
stream.buffer(stream, prefetch:)prefetches up toprefetchelements ahead of the consumer, so a latency-bound upstream (HTTP body bytes, slow disk reads) does not serialise the consumer’s per-element work. The buffer is bounded byprefetch; a slow consumer cannot blow it past that capacity.stream.broadcast(stream, n)fans one source out tonindependent consumers, each pulling at its own pace. Per-consumer queues are unbounded: if one consumer pulls aggressively while another pauses, the slow consumer’s queue grows by the per-consumer pull-distance. The worst-case memory footprint isO(max_pull_distance × n). For cardinality-unbounded sources (source.iterate,source.repeat) this is a silent memory hazard.stream.broadcast_bounded(stream, n, max_queue:)is the bounded variant: any consumer queue exceedingmax_queuetriggers a structured panic on the next upstream pull. Use it in production fan-outs (HTTP multicast, websocket pub/sub, Kafka producer tees, …) where a stalled slow consumer must surface as a crash instead of an OOM.
If you only have one consumer and just want to overlap producer work
with consumer work, reach for buffer. If you genuinely need fan-out
to multiple consumers and your source is bounded, broadcast is fine.
If the source is unbounded or production-critical, default to
broadcast_bounded.
Web framework compatibility
datastream depends on gleam_erlang >= 1.3.0 and
gleam_stdlib >= 0.44.0. Older releases of some web packages pin
gleam_erlang < 1.0.0, which conflicts with datastream.
Use these versions or newer when combining datastream with a web stack:
| Package | Minimum compatible version |
|---|---|
wisp | >= 2.0.0 |
mist | >= 6.0.0 |
gleam_httpc | >= 5.0.0 |