datastream

Hex Hex Downloads CI

datastream is a pull-based stream library for Gleam.

It is meant for pipelines that should stay lazy, repeatable, and explicit about effects. A Stream(a) is a pipeline definition, not a materialized collection, so each terminal operation runs the source again.

Install

gleam add datastream

API reference: https://hexdocs.pm/datastream

Target support

Use cases

When to use

Reach for datastream when:

Stick with gleam/list when the input already fits in memory and you don’t need lazy pulls, repeatable runs, or resource cleanup. List is simpler and faster for the small-finite case.

Examples

Each example below is a complete src/app.gleam you can paste in after gleam new app && gleam add datastream, then run with gleam run.

Basic pipeline

import datastream/fold
import datastream/source
import datastream/stream
import gleam/io

pub fn main() {
  let result =
    source.iterate(from: 1, with: fn(x) { x + 1 })
    |> stream.map(with: fn(x) { x * 2 })
    |> stream.take(up_to: 5)
    |> fold.to_list
  io.debug(result)
  // [2, 4, 6, 8, 10]
}

Line-oriented text

import datastream/fold
import datastream/source
import datastream/text
import gleam/io

pub fn main() {
  let lines =
    source.from_list(["hel", "lo\nwor", "ld\n"])
    |> text.lines
    |> fold.to_list
  io.debug(lines)
  // ["hello", "world"]
}

Binary framing

import datastream/binary
import datastream/fold
import datastream/source
import gleam/io

pub fn main() {
  let frames =
    source.from_list([<<2, 65>>, <<66, 1, 67>>])
    |> binary.length_prefixed(prefix_size: 1)
    |> fold.to_list
  io.debug(frames)
  // [<<65, 66>>, <<67>>]
}

Result-shaped streams

import datastream/fold
import datastream/source
import gleam/io

pub fn main() {
  let result =
    source.from_list([Ok(1), Ok(2), Error("bad input")])
    |> fold.collect_result
  io.debug(result)
  // Error("bad input") : Result(List(Int), String)
}

Use with dataprep

datastream does not depend on dataprep. Adding it (gleam add dataprep) and combining fold.fold with a small applicative combine step accumulates every per-element error in a single pass — the right tool when reporting all failures matters more than stopping on the first one.

import dataprep/non_empty_list
import dataprep/validated.{type Validated, Invalid, Valid}
import datastream/fold
import datastream/source
import gleam/io

pub fn main() {
  source.from_list([
    Valid(1),
    Invalid(non_empty_list.single("row 2 bad")),
    Valid(3),
    Invalid(non_empty_list.single("row 4 bad")),
  ])
  |> fold.fold(from: Valid([]), with: combine)
  |> io.debug
  // Invalid(NonEmptyList("row 2 bad", ["row 4 bad"]))
}

fn combine(
  acc: Validated(List(Int), String),
  next: Validated(Int, String),
) -> Validated(List(Int), String) {
  case acc, next {
    Valid(xs), Valid(x) -> Valid([x, ..xs])
    Valid(_), Invalid(es) -> Invalid(es)
    Invalid(es), Valid(_) -> Invalid(es)
    Invalid(a), Invalid(b) -> Invalid(non_empty_list.append(a, b))
  }
}

For the simpler short-circuit case use fold.collect_result / fold.partition_result on Stream(Result(a, e)) instead — Validated is for accumulating every error, not stopping on the first.

Resource-backed stream

source.resource opens once on the first pull, calls next for each element, and runs close exactly once on every termination path (normal end, downstream early-exit via take, fold short-circuit, sink.try_each failure).

import datastream.{Done, Next}
import datastream/fold
import datastream/source
import datastream/stream
import gleam/io

pub fn main() {
  // A toy resource: a counter that yields 1, 2, 3 then halts.
  // `open` returns the initial state; `next` advances it; `close`
  // would release a real handle (file, socket, cursor).
  let stream =
    source.resource(
      open: fn() {
        io.println("open")
        1
      },
      next: fn(n) {
        case n > 3 {
          True -> Done
          False -> Next(element: n, state: n + 1)
        }
      },
      close: fn(_state) { io.println("close") },
    )

  // Take only the first element. `close` still runs because of the
  // early-exit contract.
  stream
  |> stream.take(up_to: 1)
  |> fold.to_list
  |> io.debug
  // open
  // close
  // [1]
}

Resource with fallible open

source.try_resource is the variant whose open and per-element next can fail. A failed open emits exactly one Error(source.OpenError(e)) element and halts without calling close; per-element failures surface as Error(source.NextError(e)) and do not halt the stream.

import datastream.{Done, Next}
import datastream/fold
import datastream/source
import gleam/io

pub fn main() {
  let stream =
    source.try_resource(
      open: fn() -> Result(Int, String) { Error("not available") },
      next: fn(n) {
        case n > 3 {
          True -> Done
          False -> Next(element: Ok(n), state: n + 1)
        }
      },
      close: fn(_state) { Nil },
    )

  stream
  |> fold.to_list
  |> io.debug
  // [Error(OpenError("not available"))]
}

Line-by-line processing over a resource

The common real-world pipeline — open a file or cursor, stream lines, filter, release the handle — composes source.resource with text.lines and an early-exit terminal. The close contract releases the handle on every termination path, so the example below releases the pre-built “handle” regardless of how the downstream ends.

In production, open would return a real file handle, next would call something like file.read_line(handle) (on Erlang) and wrap the result in Next(line, handle) or Done, and close would release the handle. The toy version below uses a list of chunks in place of a real handle so the example runs on both targets.

import datastream.{Done, Next}
import datastream/fold
import datastream/source
import datastream/stream
import datastream/text
import gleam/io

pub fn main() {
  source.resource(
    open: fn() { ["INFO hello\nWARN slow\n", "INFO bye\n"] },
    next: fn(state) {
      case state {
        [] -> Done
        [head, ..tail] -> Next(element: head, state: tail)
      }
    },
    close: fn(_state) { Nil },
  )
  |> text.lines
  |> stream.take(up_to: 2)
  |> fold.to_list
  |> io.debug
  // ["INFO hello", "WARN slow"]
}

Bounded parallel map (BEAM)

import datastream/erlang/par
import datastream/fold
import datastream/source
import datastream/stream
import gleam/io

pub fn main() {
  source.iterate(from: 1, with: fn(x) { x + 1 })
  |> par.map_unordered(with: fn(x) { x * x })
  |> stream.take(up_to: 5)
  |> fold.to_list
  |> io.debug
  // [1, 4, 9, 16, 25]   (or any permutation; map_unordered emits as workers finish)
}

For deterministic order use par.map_ordered (input order preserved at the cost of a small reorder buffer). Tune concurrency with par.map_unordered_with(over:, with:, max_workers:, max_buffer:).

Time-bucketed stream (BEAM)

import datastream/chunk
import datastream/erlang/source as beam_source
import datastream/erlang/time
import datastream/fold
import datastream/stream
import gleam/io
import gleam/list

pub fn main() {
  // Tick every 20 ms; bucket arrivals into 60 ms windows; take the
  // first window's chunk.
  beam_source.ticks(every: 20)
  |> time.window_time(span: 60)
  |> stream.take(up_to: 1)
  |> fold.to_list
  |> list.flat_map(chunk.to_list)
  |> list.length
  |> io.debug
  // 3   (approximately — three ticks land in one 60 ms window)
}

Module guide

Semantics

License

MIT — see LICENSE.

Contributing

See CONTRIBUTING.md for development setup and pull request expectations. Bug reports and proposals via GitHub Issues.

Search Document