glimmer/stream

glimmer/stream introduces an abstraction over gleam/erlang/process, called a Stream. The goal of streams are to make parallelism free and easy by having it appear sequential. There are both functional and imperative (effectful) functions in this library. The imperative functions are most useful for getting a stream started: the options for filling up a fresh stream are basically just write which is imperative and from_list which is functional, and write will make more sense for many situations. Once a stream is going, and you just need to process it, then the functional functions (map, filter, reduce, collect, to_iterator) can be very ergonomic and sensible. Note: streams masquerade as lists, but they aren’t datastructures. Reading from them consumes the items, which the functional functions get around by saving results in a new stream or datastructure. If you want something like array-indexing then you’ll need to convert the stream into an actual datastructure, perhaps with collect, and likely lose any concurrency benefits.

Types

the types of messages used within Glimmer, for Streams.

pub opaque type PipeMessage(a)

A concurrent stream of values. This isn’t a datastructure per se, it’s all lazy.

pub type Stream(a) {
  Stream(process.Subject(PipeMessage(a)))
}

Constructors

  • Stream(process.Subject(PipeMessage(a)))

Functions

pub fn close(s: Stream(a)) -> Nil

Indicate that no more values will be sent in the stream. This is optional but useful if the stream is intended to model some finite datastructure, which many of these functions expect. This is an imperative, side-effectful procedure. See with for some ergonomics with close.

pub fn collect(s: Stream(a)) -> List(a)

Represent a stream as a list. This blocks until there’s an indication that the stream is over. (See close.) Also consider using to_iterator if you can.

pub fn each(stream: Stream(a), f: fn(a) -> Nil) -> Nil

Perform a side-effect for each element in the stream, consuming it. This effect can include writing to another stream, so the elements aren’t necessarily gone. For example:

use <- with(output_stream)
use i <- each(input_stream)
output_stream |> write(i * 2)
pub fn filter(input: Stream(a), p: fn(a) -> Bool) -> Stream(a)

Filter elements out of a stream concurrently (as opposed to, say, calling collect and then using list.filter). Internally there is imperative dark magic but this presents a pure functional interface (if p is pure). This makes it great for pipes. For example,

[1, 2, 3]
|> from_list
|> filter(fn(n) { n % 2 == 0 })
|> collect()
|> io.debug() // prints [1, 3]
pub fn from_list(l: List(a)) -> Stream(a)

Construct a stream from a list.

pub fn generator(g: fn(fn(a) -> Nil, fn() -> Nil) -> Nil) -> Stream(
  a,
)

Generate a stream. For example:

fn count_down(n, yield, stop) {
  case n == 0 {
    True -> stop()
    False -> {
      yield(n)
      count_down(n - 1, yield, stop)
    }
  }
}
...
generator(count_down(5, _, _))
|> collect()
|> io.debug() // prints [5, 4, 3, 2, 1]
pub fn map(input: Stream(a), f: fn(a) -> b) -> Stream(b)

Map a function over a stream concurrently (as opposed to, say, calling collect and then using list.map). Internally there is imperative dark magic but this presents a pure functional interface (if f is pure). This makes it great for pipes. For example,

[1, 2, 3]
|> from_list()
|> map(fn(n) { n + 1 })
|> map(fn(n) { n * 2 })
|> collect()
|> io.debug() // prints [4, 6, 8]
pub fn new() -> Stream(a)

Construct a stream.

pub fn next(s: Stream(a)) -> Result(a, Nil)

Get the next value from the stream. Wait if there isn’t one yet.

pub fn next_with_timeout(s: Stream(a), timeout: Int) -> Result(
  a,
  Nil,
)

Get the next value from the stream. Wait if it isn’t there yet, giving up if the timeout runs out.

pub fn reduce(input: Stream(a), start: b, f: fn(a, b) -> b) -> b

Reduce (or fold) a stream to a value. This is concurrent in the sense that reduction steps begin before the last value arrives, and may happen in parallel. However, the function won’t return until all values are received, of course. Internally there is imperative dark magic but this presents a pure functional interface (if f is pure). This makes it great for pipes. For example,

[1, 2, 3]
|> from_list
|> reduce(0, fn(a, b) { a + b })
|> io.debug() // prints 6
pub fn to_iterator(s: Stream(a)) -> Iterator(a)

Represent a stream as a Gleam iterator. This is a more faithful representation than a list, and indeed concurrency is preserved by the produced iterator!

pub fn try_each(stream: Stream(a), f: fn(a) -> Result(Nil, b)) -> Result(
  Nil,
  b,
)

Iterate through the elements in the stream until done or Error. Perform some computation each time. For example:

use i <- try_each(input_stream)
case i < 0 {
  True -> Error("found a negative!")
  False -> Ok(Nil)
}
pub fn with(stream: Stream(a), f: fn() -> b) -> b

Use a stream and then close it. This is intended for use syntax, for example:

use <- with(output_stream)
write(out, "hi")
pub fn write(s: Stream(a), value: a) -> Nil

Write a value to a stream. This is an imperative, side-effectful procedure.

Search Document