gs
Types
pub type OverflowStrategy {
Wait
Drop
Stop
Panic
}
Constructors
-
Wait
-
Drop
-
Stop
-
Panic
A Stream represents a lazy, pull-based sequence of elements. Each element is computed only when requested, making it memory efficient for large or infinite sequences.
Streams operate by “pulling” one element at a time:
[Source] --> [Element 1] --> [Element 2] --> [Element 3] --> None
^ ^ ^
| | |
Pull when Pull when Pull when
needed needed needed
Key characteristics:
- Pull-based: Elements are computed on demand
- Sequential: Only one element is processed at a time
- Lazy: Elements are not computed until requested
- Finite: Stream ends when None is returned
pub type Stream(a) {
Stream(pull: fn() -> Option(#(a, Stream(a))))
}
Constructors
-
Stream(pull: fn() -> Option(#(a, Stream(a))))
Functions
pub fn buffer(
stream: Stream(a),
size: Int,
mode: OverflowStrategy,
) -> Stream(a)
Creates a buffered stream that holds elements in a queue with specified overflow strategy.
Example
> from_range(1, 10)
|> buffer(
size: 3, // Buffer capacity
mode: Wait // Wait when buffer is full
)
|> tap(fn(x) { process.sleep(100) }) // Simulate slow processing
|> to_list()
// Returns [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] with buffering
Visual Representation
Fast producer, slow consumer with buffer:
+----------------Buffer (size 3)----------------+
| +---+ +---+ +---+ |
Producer | | 1 | | 2 | | 3 | (waiting...) | Consumer
--------->| +---+ +---+ +---+ |---------->
[4,5,6,..]| └─────────────────────┘ | [1,2,3,..]
+--------------------------------------------+
Strategy behaviors when buffer is full:
- Wait: Producer pauses until space available
- Drop: New elements are discarded
- Stop: Stream terminates
- Panic: Raises an error
When to Use
- When processing speeds differ between producer and consumer
- For implementing backpressure mechanisms
- When you need to smooth out processing spikes
- For rate-limiting or flow control
- When implementing producer-consumer patterns
- For managing memory usage with fast producers
- When implementing stream processing pipelines
Description
The buffer
function creates a buffered stream that can hold a specified
number of elements in a queue. It’s particularly useful when dealing with
producers and consumers operating at different speeds. The function takes:
- A stream to buffer
- A buffer size limit
- An overflow strategy specifying behavior when the buffer is full
Overflow Strategies:
Wait
: Producer waits until space is available (backpressure)Drop
: New elements are discarded when buffer is fullStop
: Stream terminates when buffer overflowsPanic
: Raises an error on buffer overflow
The buffering mechanism:
- Creates an asynchronous queue of specified size
- Processes elements through the queue in FIFO order
- Applies the specified overflow strategy when full
- Maintains element order
- Handles stream termination gracefully
This is particularly useful for:
- Managing producer-consumer speed differences
- Implementing backpressure
- Controlling memory usage
- Smoothing out processing irregularities
- Building robust streaming pipelines
pub fn chunks(stream: Stream(a), size: Int) -> Stream(List(a))
Groups elements from a stream into chunks of the specified size.
Example
> from_range(1, 10)
|> chunks(3)
|> to_list()
[[1, 2, 3], [4, 5, 6], [7, 8, 9], [10]]
Visual Representation
Input: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->[7]-->[8]-->[9]-->[10]-->|
| | | | | | | | |
└─────┴─────┴ └─────┴─────┘ └─────┴─────┘ └─┘
chunk 1 chunk 2 chunk 3 chunk 4
↓ ↓ ↓ ↓
Output: [[1,2,3]]-------->[[4,5,6]]-------->[[7,8,9]]----->[[10]]-->|
Where |
represents the end of the stream and partial chunks are included
When to Use
- When processing data in fixed-size batches
- For implementing pagination or windowing operations
- When buffering or batching stream elements for bulk processing
- For grouping elements for parallel processing
- When implementing network protocols with fixed-size frames
- For chunking large datasets into manageable pieces
Description
The chunks
function transforms a stream by grouping its elements into
fixed-size chunks. Each chunk is emitted as a list containing size
elements,
except possibly the last chunk which may contain fewer elements if the stream
length is not evenly divisible by the chunk size.
The function:
- Processes elements lazily, only forming chunks when requested
- Preserves the order of elements within and between chunks
- Returns empty stream if size <= 0
- Includes partial chunks at the end of the stream
- Never creates chunks larger than the specified size
- Creates a stream of lists, where each list is a chunk
pub fn concat(first: Stream(a), second: Stream(a)) -> Stream(a)
Concatenates two streams into a single stream, yielding all elements from the first stream followed by all elements from the second stream.
Example
> [1, 2, 3]
|> from_list
|> concat(from_list([4, 5, 6]))
|> to_list()
[1, 2, 3, 4, 5, 6]
Visual Representation
Stream 1: [1]-->[2]-->[3]-->|
Stream 2: [4]-->[5]-->[6]-->|
[1]-->[2]-->[3]-->[4]-->[5]-->[6]-->|
└── Stream 1 ──┘ └── Stream 2 ──┘
Where |
represents the end of the stream
When to Use
- When you need to combine two streams sequentially
- For appending data from different sources
- When implementing stream operations that need to chain multiple streams
- When building composite streams from smaller streams
- For implementing stream operations like
flat_map
orflatten
- When you want to process multiple streams in sequence
Description
The concat
function combines two streams by first yielding all elements from
the first stream until it’s exhausted, then yielding all elements from the
second stream. The concatenation is lazy - elements from the second stream
aren’t processed until all elements from the first stream have been consumed.
The function:
- Preserves the order of elements from both streams
- Processes streams lazily
- Returns an empty stream if both input streams are empty
- Handles infinite streams in the first position correctly (second stream never reached)
pub fn count(stream: Stream(a)) -> Stream(#(a, Int))
Counts elements as they pass through the stream, yielding tuples of elements and their count.
Example
> from_range(1, 3)
|> count()
|> to_list()
[#(1, 1), #(2, 2), #(3, 3)]
Visual Representation
Input: [1]---->[2]---->[3]-->|
| | |
count=1 count=2 count=3
| | |
v v v
Output: [(1,1)]->[(2,2)]->[(3,3)]-->|
Where:
|
represents the end of the stream- Each element is paired with its count
- Count increases monotonically
When to Use
- When you need to track how many elements have been processed
- For implementing progress tracking in stream processing
- When you need element indices in a stream
- For debugging or monitoring stream flow
- When implementing stateful stream operations
- For adding sequence numbers to stream elements
Description
The count
function transforms a stream by pairing each element with a
running count of how many elements have been processed. It maintains an
internal counter that increments for each element, starting from 1.
Key characteristics:
- Preserves original elements
- Adds monotonic counting
- Processes elements lazily
- Count starts at 1
- Returns tuples of (element, count)
pub fn debug(stream: Stream(a)) -> Stream(a)
Logs each element of a stream for debugging.
Example:
pure(42) |> debug
pub fn drop(stream: Stream(a), n: Int) -> Stream(a)
Drops (skips) the first n
elements from a stream.
Example
> from_range(1, 10)
|> drop(5)
|> to_list()
[6, 7, 8, 9, 10]
Visual Representation
Input: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->[7]-->[8]-->|
× × × × × | | |
v v v
Output: ----------------------------->[6]-->[7]-->[8]-->|
Where:
|
represents the end of the stream×
indicates dropped elements- Elements after the drop point flow through unchanged
When to Use
- When you want to skip a known number of elements at the start of a stream
- For implementing pagination (skipping the first n pages)
- When processing data streams where header elements should be ignored
- For removing initialization or warm-up data
- When implementing offset-based operations
- To synchronize streams by dropping initial mismatched elements
Description
The drop
function creates a new stream that skips the first n
elements
from the input stream and yields all remaining elements. The function processes
elements lazily - only advancing through the dropped elements when the resulting
stream is pulled. This makes it memory efficient as dropped elements are never
materialized.
The function:
- Preserves the order of remaining elements
- Processes dropped elements lazily
- Returns an empty stream if n is greater than the stream length
- Returns the original stream if n is 0 or negative
pub fn error_terminated(
stream: Stream(Result(a, b)),
) -> Stream(a)
Terminates a stream when encountering an Error value, yielding only Ok values until then.
Example
> [Ok(1), Ok(2), Error("oops"), Ok(3)]
|> from_list
|> error_terminated
|> to_list()
[1, 2] // Terminates at Error, ignoring Ok(3)
Visual Representation
Input: [Ok(1)]-->[Ok(2)]-->[Error("oops")]-->[Ok(3)]-->|
| | | ×
v v v
Output: [1]------>[2]---------->|
Where:
|
represents the end of the stream×
represents values that are ignored after Error- Values flow from left to right until Error
When to Use
- When processing streams that should stop on first error
- For implementing fail-fast error handling
- When working with fallible operations where errors should terminate processing
- For converting result-based sequences into regular streams
- When implementing error-based early termination
- For handling streams with error conditions as termination points
Description
The error_terminated
function transforms a stream of Result values into a
stream that yields only the unwrapped Ok values and terminates when it
encounters an Error. Any values after the first Error are ignored. This is
particularly useful when implementing fail-fast error handling or when working
with streams that should terminate on first error condition.
Key characteristics:
- Unwraps Ok values into regular values
- Terminates immediately on first Error
- Ignores all values after Error
- Processes elements lazily
- Preserves order of elements before Error
- Returns empty stream if first element is Error
pub fn filter(
stream: Stream(a),
keeping pred: fn(a) -> Bool,
) -> Stream(a)
Filters elements from a stream based on a predicate function.
Example
> from_range(1, 10)
|> filter(fn(x) { x > 5 })
|> to_list()
[6, 7, 8, 9, 10]
Visual Representation
Input: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->[7]-->[8]-->|
| | | | | | | |
p(x) p(x) p(x) p(x) p(x) p(x) p(x) p(x)
× × × × × ✓ ✓ ✓
| | |
v v v
Output: ------------------------------>[6]-->[7]-->[8]-->|
Where:
|
represents the end of the streamp(x)
is the predicate function×
indicates the element was filtered out✓
indicates the element passed the filter
When to Use
- When you need to remove unwanted elements from a stream
- For implementing search or query operations
- When validating or sanitizing data streams
- For extracting specific patterns or values
- When implementing business logic filters
- For data cleanup operations
Description
The filter
function creates a new stream that only includes elements from
the input stream that satisfy the given predicate function. Elements are
processed lazily - the predicate is only evaluated when elements are pulled
from the resulting stream.
The function:
- Preserves the order of elements that pass the filter
- Skips elements that don’t satisfy the predicate
- Processes elements lazily (on-demand)
- Returns an empty stream if no elements satisfy the predicate
pub fn find(stream: Stream(a), pred: fn(a) -> Bool) -> Stream(a)
Finds the first element in a stream that satisfies a predicate and returns a stream containing only that element.
Example
> from_range(1, 10)
|> find(fn(x) { x > 5 })
|> to_list()
[6]
Visual Representation
Input: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->[7]-->[8]-->|
| | | | | | | |
p(x) p(x) p(x) p(x) p(x) p(x) p(x) p(x)
× × × × × ✓ - -
|
v
Output: ------------------------------>[6]-->|
Where:
|
represents the end of the streamp(x)
is the predicate function×
indicates the element did not match✓
indicates the first matching element-
indicates elements not evaluated
When to Use
- When searching for the first occurrence of an element meeting specific criteria
- For implementing early termination in stream processing
- When you need only the first matching element from a potentially large stream
- For implementing search functionality
- When validating the existence of elements meeting certain conditions
- To optimize performance by stopping processing after finding a match
Description
The find
function creates a new stream that contains only the first element
from the input stream that satisfies the given predicate function. The function
processes elements lazily and stops as soon as a matching element is found,
making it efficient for large streams. If no element satisfies the predicate,
an empty stream is returned.
The function:
- Processes elements sequentially until a match is found
- Stops processing after finding the first match
- Returns a stream with at most one element
- Returns an empty stream if no elements match the predicate
pub fn flat_map(
over stream: Stream(a),
with f: fn(a) -> Stream(b),
) -> Stream(b)
Transforms each element into a stream and flattens all streams into a single stream.
Example
> from_range(1, 3)
|> flat_map(fn(x) { from_range(1, x) })
|> to_list()
[1, 1, 2, 1, 2, 3]
Visual Representation
Input: [1]-------->[2]-------->[3]------->|
| | |
v v v
[1]-->| [1,2]-->| [1,2,3]-->|
| | | | | | |
v v v v v v v
Output: [1]---->[1]--[2]-->[1]--[2]--[3]-->|
Where |
represents the end of the stream
When to Use
- When you need to generate multiple elements from each input element
- For handling nested or hierarchical data structures
- When implementing tree traversal or graph algorithms
- For expanding or exploding single elements into sequences
- When working with relationships where one item maps to many items
- For implementing more complex stream transformations
Description
The flat_map
function applies a transformation to each element of the input
stream, where the transformation produces a new stream for each element. These
resulting streams are then flattened into a single output stream. The function
processes elements lazily, only generating new streams as elements are requested
from the result stream.
Key characteristics:
- One-to-many transformation: Each input element can produce multiple output elements
- Preserves order: Elements from earlier streams appear before elements from later streams
- Lazy evaluation: Streams are only generated and flattened when needed
pub fn flatten(stream: Stream(Stream(a))) -> Stream(a)
Flattens a stream of streams into a single stream.
Example
> [[1, 2], [3, 4], [5, 6]]
|> from_list
|> map(from_list)
|> flatten
|> to_list
[1, 2, 3, 4, 5, 6]
Visual Representation
Input stream of streams:
+----------+ +----------+ +----------+
-->|[1,2] |--->|[3,4] |--->|[5,6] |-->|
+----------+ +----------+ +----------+
| | |
v v v
[1]-->[2]-->| [3]-->[4]-->| [5]-->[6]-->|
Flattened output:
+---+ +---+ +---+ +---+ +---+ +---+
-->| 1 |--->| 2 |--->| 3 |--->| 4 |--->| 5 |--->| 6 |-->|
+---+ +---+ +---+ +---+ +---+ +---+
Where |
represents the end of the stream
When to Use
- When working with nested stream structures that need to be linearized
- For processing hierarchical data as a single sequence
- When implementing monadic operations (e.g., flat_map)
- For combining multiple data sources into a single stream
- When dealing with streams of collections that need to be concatenated
- For implementing stream operations that produce multiple elements per input
Description
The flatten
function takes a stream of streams and flattens it into a single
stream by concatenating all inner streams in order. It processes streams lazily,
only pulling from the next inner stream when all elements from the current inner
stream have been consumed.
The function:
- Preserves the order of elements from both outer and inner streams
- Processes streams lazily (on-demand)
- Handles empty inner streams by skipping to the next one
- Returns an empty stream if all inner streams are empty
- Maintains the streaming nature of both levels
pub fn from_counter(start: Int) -> Stream(Int)
Given a counter integer, constructs an infinite stream of integers starting from the counter and incrementing by 1.
Example
> let numbers = from_counter(1)
> numbers |> take(3) |> to_list()
[1, 2, 3]
Visual Representation
from_counter(1)
+---+ +---+ +---+ +---+
-->| 1 |--->| 2 |--->| 3 |--->| 4 |-->...
+---+ +---+ +---+ +---+
Description
Creates an infinite stream that yields consecutive integers starting from the provided counter value. Each element in the stream is exactly one greater than the previous element. The stream never terminates and will continue producing values indefinitely.
When to Use
- When you need a source of sequential integers
- For generating unique identifiers
- For creating test data with sequential values
- When implementing pagination or indexing
Note: Since this creates an infinite stream, make sure to use it with
functions like take
, take_while
, or other stream terminators to avoid
infinite loops.
pub fn from_dict(dict: Dict(a, b)) -> Stream(#(a, b))
Creates a new stream from a dictionary.
Example
> dict.from_list([#("a", 1), #("b", 2), #("c", 3)])
|> from_dict
|> to_list
[#("a", 1), #("b", 2), #("c", 3)]
Visual Representation
dict.from_list([#("a", 1), #("b", 2), #("c", 3)]) |> from_dict
+--------+ +--------+ +--------+
-->|"a": 1 |--->|"b": 2 |--->|"c": 3 |-->|
+--------+ +--------+ +--------+
Where |
represents the end of the stream
When to Use
- When you need to process dictionary entries one at a time
- When converting a dictionary to a stream of key-value pairs
- When you want to lazily iterate over dictionary entries
- When implementing operations that need to work with dictionaries as streams
Description
The from_dict
function creates a stream that yields each key-value pair from
the input dictionary as a tuple. The stream preserves the dictionary’s entries
but processes them lazily, one at a time. Each element in the resulting stream
is a tuple containing a key and its associated value. The stream terminates
after yielding all dictionary entries. This is particularly useful when you
want to process large dictionaries efficiently or when you need to chain
dictionary operations in a streaming context.
pub fn from_empty() -> Stream(a)
Creates a new empty stream.
The stream will yield nothing and terminate immediately.
Example
let empty_stream = from_empty()
list.from_stream(empty_stream) // -> []
Visualization
from_empty() -> |
Where |
represents the end of the stream
When to use
- When you need a stream that yields no elements
- As a base case for recursive stream operations
- When implementing stream operations that might need to return an empty result
- As a neutral element in stream concatenation operations
Description
The from_empty
function creates a stream that immediately terminates without
yielding any values. This is useful as a base case in recursive operations,
when implementing stream combinators, or when you need to represent the absence
of values in a streaming context. The empty stream is analogous to an empty
list but in a lazy evaluation context.
pub fn from_list(items: List(a)) -> Stream(a)
Creates a new stream from a list.
Example
> [1, 2, 3] |> from_list |> to_list
[1, 2, 3]
Visual Representation
[1, 2, 3] |> from_list
+---+ +---+ +---+
-->| 1 |--->| 2 |--->| 3 |-->|
+---+ +---+ +---+
Where |
represents the end of the stream
When to Use
- When converting an existing list to a stream
- When you want to process list elements lazily
- When implementing stream operations that build from lists
- When you need to transform eager evaluation (lists) into lazy evaluation (streams)
Description
The from_list
function creates a stream that yields each element from the input
list in order. The stream maintains the original order of elements and terminates
after yielding the last element. This is useful when you want to process list
elements one at a time.
pub fn from_option(option: Option(a)) -> Stream(a)
Creates a new stream from an option value.
Example
> Some(42) |> from_option |> to_list
[42]
> None |> from_option |> to_list
[]
Visual Representation
Some(42) |> from_option
+----+
-->| 42 |-->|
+----+
None |> from_option
-->|
Where |
represents the end of the stream
When to Use
- When converting an optional value into a stream
- When you want to handle presence/absence of a value in a streaming context
- As a building block for stream operations that may or may not have values
Description
The from_option
function creates a stream from an Option value.
pub fn from_pure(value: a) -> Stream(a)
Creates a new single-element stream from a given value.
Example
> 42 |> from_pure |> to_list
[42]
Visual Representation
42 |> from_pure
+----+
-->| 42 |-->|
+----+
Where |
represents the end of the stream
When to Use
- When you need to create a stream with exactly one element
- When converting a single value into a stream
- As a building block for more complex stream operations
- When implementing monadic operations that require lifting a value into a stream
Description
The from_pure
function creates a stream that yields exactly one value and then
terminates. It “lifts” a single value into the stream context. The resulting
stream will emit the given value once and then immediately terminate. This is
one of the fundamental stream constructors and is often used in combination
with other stream operations.
pub fn from_range(from start: Int, to end: Int) -> Stream(Int)
Creates a stream of integers from a start value (inclusive) to an end value (inclusive).
Example
> from_range(from: 1, to: 5) |> to_list()
[1, 2, 3, 4, 5]
Visual Representation
from_range(from: 1, to: 5)
+---+ +---+ +---+ +---+ +---+
-->| 1 |--->| 2 |--->| 3 |--->| 4 |--->| 5 |-->|
+---+ +---+ +---+ +---+ +---+
Where |
represents the end of the stream
When to Use
- When you need a finite sequence of consecutive integers
- For iteration over a known range of numbers
- For generating bounded sequences
- When implementing pagination with fixed bounds
- For creating test data with sequential values in a range
Description
The from_range
function creates a finite stream that yields consecutive integers
from the start value up to and including the end value. If the start value is
greater than the end value, an empty stream is returned. Each element in the
stream is exactly one greater than the previous element. The stream terminates
after yielding the end value.
pub fn from_range_exclusive(
from start: Int,
until end: Int,
) -> Stream(Int)
Creates a stream of integers from a start value (inclusive) to an end value (exclusive).
Example
> from_range_exclusive(from: 1, until: 5) |> to_list()
[1, 2, 3, 4]
Visual Representation
from_range_exclusive(from: 1, until: 5)
+---+ +---+ +---+ +---+
-->| 1 |--->| 2 |--->| 3 |--->| 4 |-->|
+---+ +---+ +---+ +---+
Where |
represents the end of the stream
When to Use
- When you need a finite sequence of consecutive integers excluding the end value
- For zero-based indexing scenarios (e.g., array indices)
- For loops where you want to exclude the upper bound
- When implementing slice operations
- For range operations that follow Python-style slicing conventions
Description
The from_range_exclusive
function creates a finite stream that yields consecutive
integers from the start value up to, but not including, the end value. If the
start value is greater than or equal to the end value, an empty stream is returned.
Each element in the stream is exactly one greater than the previous element.
The stream terminates after yielding the last value before the end value.
pub fn from_repeat(value: a) -> Stream(a)
Creates an infinite stream that repeatedly yields the same value.
Example
> 42 |> from_repeat |> take(3) |> to_list
[42, 42, 42]
Visual Representation
42 |> from_repeat
+----+ +----+ +----+ +----+
-->| 42 |--->| 42 |--->| 42 |--->| 42 |-->...
+----+ +----+ +----+ +----+
When to Use
- When you need a constant stream of the same value
- For testing stream operations with consistent input
- When implementing backpressure mechanisms with default values
- For creating placeholder or dummy data streams
- When you need an infinite source of identical values
Description
The from_repeat
function creates an infinite stream that yields the same
value indefinitely. Each time the stream is pulled, it produces the original
value and a new stream that will continue to produce the same value. This
creates an endless sequence of identical values, useful for testing, default
values, or as a building block for more complex stream operations. Since the
stream is infinite, it should typically be used with functions like take
or take_while
to limit the number of values produced.
pub fn from_repeat_eval(f: fn() -> a) -> Stream(a)
Creates an infinite stream that repeatedly evaluates a function to generate values.
Example
> fn() { utils.timestamp() }
|> from_repeat_eval
|> take(3)
|> to_list
// Emits [1705123456, 1705123457, 1705123458] // Different timestamps
Visual Representation
from_repeat_eval(fn() { random() })
+-----+ +-----+ +-----+ +-----+
-->| 42 |--->| 17 |--->| 33 |--->| 89 |-->...
+-----+ +-----+ +-----+ +-----+
↑ ↑ ↑ ↑
f() call f() call f() call f() call
Where each value is generated by a fresh call to the function
When to Use
- When you need a stream of dynamically generated values
- For creating streams of random numbers
- For testing with varying data
- When you need to encapsulate side-effects in a stream
- For creating streams that reflect changing system state
Description
The from_repeat_eval
function creates an infinite stream that generates values
by repeatedly calling the provided function. Unlike from_repeat
which produces
the same value repeatedly, this function evaluates the given function each time
a new value is needed, making it suitable for dynamic content generation.
Each pull of the stream results in a fresh call to the function, potentially
producing different values each time. The stream continues indefinitely and
should typically be used with functions like take
or take_while
to limit
the number of evaluations.
pub fn from_result(result: Result(a, b)) -> Stream(a)
Creates a new stream from a Result value.
Example
> Ok(42) |> from_result |> to_list
[42]
> Error("oops") |> from_result |> to_list
[]
Visual Representation
Ok(42) |> from_result
+----+
-->| 42 |-->|
+----+
Error("oops") |> from_result
-->|
Where |
represents the end of the stream
When to Use
- When converting a Result value into a stream
- When handling computations that may fail in a streaming context
- When filtering out error cases from a sequence of operations
- When transforming error-handling code into data processing pipelines
- As a building block for stream operations that work with Results
Description
The from_result
function creates a stream from a Result value. If the Result
is Ok
, it creates a single-element stream containing the value. If the Result
is Error
, it creates an empty stream. This is useful for handling operations
that may fail and converting them into a streaming context where errors are
simply filtered out. The error value is discarded, making this function
appropriate when you want to proceed with successful values only.
pub fn from_state_eval(
current: a,
f: fn(a) -> #(b, a),
) -> Stream(b)
Creates a stream by repeatedly applying a state transition function.
Example
// Generate Fibonacci numbers using state transitions
from_state_eval(
#(0, 1), // Initial state: (current, next)
fn(state) {
let #(current, next) = state
#(current, #(next, current + next)) // Return (value, new_state)
}
)
|> take(8)
|> to_list()
// Returns [0, 1, 1, 2, 3, 5, 8, 13]
Visual Representation
Initial State: (0,1)
+---+ +---+ +---+ +---+
-->| 0 |--->| 1 |--->| 1 |--->| 2 |-->...
+---+ +---+ +---+ +---+
↑ ↑ ↑ ↑
f(0,1) f(1,1) f(1,2) f(2,3)
↓ ↓ ↓ ↓
(1,1) (1,2) (2,3) (3,5)
state state state state
Where:
- Each element is produced by applying f to the current state
- State transitions drive the stream generation
- f returns both the current value and the next state
When to Use
- When generating sequences that depend on previous values
- For implementing stateful stream transformations
- When creating mathematical sequences (Fibonacci, etc.)
- For simulating state machines or systems with evolving state
- When implementing iterative algorithms that maintain state
- For creating streams with complex progression rules
- When you need to track auxiliary information between elements
Description
The from_state_eval
function creates a stream by repeatedly applying a
state transition function to an initial state. The function takes:
- An initial state value of type
b
- A function
f
that takes the current state and returns a tuple of:- The value to emit in the stream
- The next state to use
Each time the stream is pulled, the state transition function is applied to the current state to produce both the next value and the next state. This allows for sophisticated sequence generation where each element can depend on the history of previous elements through the maintained state.
The function is particularly useful for:
- Generating mathematical sequences
- Implementing stateful transformations
- Creating streams with complex progression rules
- Simulating systems with evolving state
- Building iterative algorithms that require state
The resulting stream continues indefinitely, driven by the state transitions, until terminated by other stream operations.
pub fn from_subject(subject: Subject(Option(a))) -> Stream(a)
Creates a stream from a process Subject that receives values of type Option(a).
Example
let subject = process.new_subject()
process.send(subject, Some(1))
process.send(subject, Some(2))
process.send(subject, None)
subject
|> from_subject
|> to_list
// -> [1, 2]
Visual Representation
Subject --> Stream
+---+ +---+
---->| 1 |--->| 2 |---> None
+---+ +---+
↑ ↑ ↑
Some(1) Some(2) None
Where messages flow from the Subject into the Stream until None is received
When to Use
- When converting process messages into a stream
- For handling asynchronous data sources
- When bridging between process-based and stream-based code
- For implementing pub/sub patterns with streams
- When you need to process messages sequentially
Description
The from_subject
function creates a stream that yields values received from
a process Subject. It expects messages of type Option(a)
where:
Some(value)
represents a value to be emitted by the streamNone
signals the end of the stream
The stream will continuously wait for messages until a None
is received,
at which point it terminates. This makes it useful for converting
asynchronous message-based communication into a sequential stream
of values.
pub fn from_subject_timeout(
subject: Subject(a),
timeout_ms: Int,
) -> Stream(a)
Creates a stream from a process Subject that times out after a specified duration.
Example
let subject = process.new_subject()
// In another process
process.send(subject, 1)
process.send(subject, 2)
// Wait more than timeout...
process.send(subject, 3)
subject
|> from_subject_timeout(1000)
|> to_list
// -> [1, 2] // 3 is not received due to timeout
Visual Representation
Subject --> Stream (with timeout)
+---+ +---+ +---+
---->| 1 |--->| 2 |----X----| 3 |
+---+ +---+ | +---+
↑ ↑ | ↑
0.1s 0.2s 1.0s 1.1s
timeout
Where ‘X’ represents the timeout point that terminates the stream
When to Use
- When processing messages with time constraints
- For implementing timeout-based protocols
- When handling potentially slow or unreliable message sources
- For graceful shutdown of stream processing
- When implementing time-bounded operations
- For preventing infinite waiting on message streams
Description
The from_subject_timeout
function creates a stream that yields values received
from a process Subject, but will terminate if no message is received within the
specified timeout period (timeout_ms
). Each message receipt resets the timeout
window. This is particularly useful for handling scenarios where message flow
might be interrupted or when you need to ensure timely processing of messages.
The stream will:
- Emit each received message as a stream element
- Wait up to
timeout_ms
milliseconds for each new message - Terminate if no message is received within the timeout period
- Reset the timeout window after each successful message receipt
pub fn from_tick(delay_ms: Int) -> Stream(Int)
Creates a stream that emits a value every delay_ms
milliseconds.
Example
> 1000
|> from_tick
|> take(3)
|> to_list
// Emits [0, 0, 0] with 1 second delay between each value
Visual Representation
from_tick(1000)
+---+ +---+ +---+ +---+
-->| 0 |-1s>| 0 |-1s>| 0 |-1s>| 0 |-->...
+---+ +---+ +---+ +---+
Where -1s>
represents a 1 second delay
When to Use
- When implementing polling mechanisms
- For creating periodic events or heartbeats
- When building timer-based functionality
- For implementing rate-limiting or throttling
- When simulating time-based events in testing
Description
The from_tick
function creates an infinite stream that emits values at
regular intervals specified by delay_ms
. Each emission is accompanied by
a delay of delay_ms
milliseconds. The stream emits the number of
milliseconds that were actually delayed beyond the expected interval
(usually 0 unless system load causes delays).
The function will panic if delay_ms
is less than or equal to 0.
pub fn from_timestamp_eval() -> Stream(Int)
Creates an infinite stream of Unix timestamps that evaluates the current time on each pull.
Example
> from_timestamp_eval()
|> take(3)
|> to_list
// Emits [1705123456, 1705123457, 1705123458] // Different timestamps
Visual Representation
from_timestamp_eval()
+---------+ +---------+ +---------+ +---------+
-->| 1705123 |--->| 1705124 |--->| 1705125 |--->| 1705126 |-->...
+---------+ +---------+ +---------+ +---------+
↑ ↑ ↑ ↑
timestamp() timestamp() timestamp() timestamp()
Where each value is a fresh timestamp evaluation
When to Use
- When you need a stream of current timestamps
- For logging with temporal information
- When implementing time-based monitoring
- For creating time series data
- When measuring elapsed time between operations
Description
The from_timestamp_eval
function creates an infinite stream that generates
Unix timestamps. Unlike a static stream, this evaluates the current timestamp
each time a value is pulled, ensuring each emitted value reflects the actual
time at the moment of access. The stream continues indefinitely and should
typically be used with functions like take
or take_while
to limit the
number of evaluations.
pub fn intersperse(stream: Stream(a), separator: a) -> Stream(a)
Inserts a separator element between each element of a stream.
Example
> from_list([1, 2, 3])
|> intersperse(0)
|> to_list()
[1, 0, 2, 0, 3]
Visual Representation
Input: [1]----->[2]----->[3]-->|
Output: [1]-->[0]--->[2]--->[0]--->[3]-->|
^ ^ ^ ^ ^
| | | | |
value sep value sep value
Where:
|
represents the end of the streamsep
represents the separator- Elements flow from left to right
When to Use
- When formatting streams for display (e.g., adding commas between items)
- When implementing string join operations with streams
- For adding delimiters between stream elements
- When creating visual separations in streamed output
- For implementing protocol-specific element separation
- When generating formatted text from stream elements
Description
The intersperse
function creates a new stream that contains all elements
from the input stream with a separator value inserted between each pair of
elements. The separator is not added before the first element or after the
last element. This is particularly useful when formatting streams for
display or when implementing protocols that require specific delimiters
between elements.
Key characteristics:
- Preserves original element order
- Only inserts separators between elements
- No separator before first or after last element
- Processes elements lazily
- Returns empty stream if input is empty
pub fn map(
over stream: Stream(a),
with f: fn(a) -> b,
) -> Stream(b)
Maps a function over a stream.
Example
> from_range(1, 5)
|> map(fn(x) { x * 2 })
|> to_list()
[2, 4, 6, 8, 10]
Visual Representation
stream: [1]-->[2]-->[3]-->[4]-->[5]-->|
| | | | |
f(x) f(x) f(x) f(x) f(x)
| | | | |
v v v v v
result: [2]-->[4]-->[6]-->[8]-->[10]-->|
Where |
represents the end of the stream and f(x)
is the mapping function
When to Use
- When transforming each element of a stream without changing the stream structure
- For data conversion or formatting
- When applying calculations to each element
- For type conversion operations
- When preparing data for further processing
- As a building block in larger stream processing pipelines
Description
The map
function transforms a stream by applying a function to each element,
creating a new stream with the transformed values. The stream maintains its
original structure (length and order) while transforming the elements. The
mapping function is applied lazily - only when elements are pulled from the
resulting stream.
pub fn none_terminated(stream: Stream(Option(a))) -> Stream(a)
Creates a stream that terminates when encountering a None value.
Example
> [Some(1), Some(2), None, Some(3)]
|> from_list
|> none_terminated
|> to_list()
[1, 2] // Terminates at None, ignoring Some(3)
Visual Representation
Input: [Some(1)]-->[Some(2)]-->[None]-->[Some(3)]-->|
| | | ×
v v v
Output: [1]-------->[2]-------->|
Where:
|
represents the end of the stream×
represents values that are ignored after None- Values flow from left to right until None
When to Use
- When processing streams that use None as a termination signal
- For implementing protocols with end-of-stream markers
- When working with optional values where None indicates completion
- For converting option-based sequences into regular streams
- When implementing early termination based on None values
- For handling streams with natural termination points
Description
The none_terminated
function transforms a stream of Option values into a
stream that yields only the unwrapped Some values and terminates when it
encounters a None. Any values after the first None are ignored. This is
particularly useful when working with protocols or data sources that use
None as an end-of-stream marker.
Key characteristics:
- Unwraps Some values into regular values
- Terminates immediately on first None
- Ignores all values after None
- Processes elements lazily
- Preserves order of elements before None
- Returns empty stream if first element is None
pub fn println(stream: Stream(String)) -> Stream(String)
Prints each element of a stream to the console.
Example:
pure("Hello, world!") |> println
pub fn rate_limit_backoff(
stream: Stream(a),
initial_delay_ms: Int,
max_delay_ms: Int,
factor: Float,
) -> Stream(a)
Implements exponential backoff rate limiting for a stream by dynamically adjusting delays between elements based on processing history.
Example
> from_range(1, 10)
|> rate_limit_backoff(
initial_delay_ms: 100, // Start with 100ms delay
max_delay_ms: 1000, // Cap at 1 second delay
factor: 2.0 // Double delay on each retry
)
|> to_list()
// Returns [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] with increasing delays
Visual Representation
Input: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->...
| | | | | |
100ms 200ms 400ms 800ms 1s 1s (exponential delays)
| | | | | |
Output: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->...
When to Use
- When implementing retry logic with increasing delays
- For handling rate-limited APIs with automatic backoff
- When implementing fault-tolerant stream processing
- For graceful degradation under load
- When implementing adaptive rate limiting
- For implementing congestion control mechanisms
Description
The rate_limit_backoff
function creates a rate-limited stream that automatically
adjusts delays between elements using exponential backoff. Unlike linear rate
limiting, this approach increases delays exponentially up to a maximum value,
making it suitable for scenarios where adaptive rate limiting is needed.
Parameters:
- initial_delay_ms: Starting delay between elements
- max_delay_ms: Maximum delay cap
- factor: Multiplication factor for delay increase
The function ensures that:
- Delays start at initial_delay_ms
- Each delay is multiplied by factor
- Delays are capped at max_delay_ms
This is particularly useful for:
- API rate limit compliance
- Retry mechanisms
- Load balancing
- Error recovery
- System protection
pub fn rate_limit_linear(
stream: Stream(a),
rate: Int,
interval_ms: Int,
) -> Stream(a)
TODO: rate_limit Implements linear rate limiting for a stream by controlling the number of elements processed per time interval.
Example
> from_range(1, 10)
|> rate_limit_linear(
rate: 2, // 2 elements
interval_ms: 1000 // per second
)
|> to_list()
// Returns [1, 2, 3, 4, 5, 6, 7, 8, 9, 10] with constant delays
// Processes 2 elements per second
Visual Representation
Input: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->...
| | | | | |
500ms 500ms 500ms 500ms 500ms 500ms (constant delays)
| | | | | |
Output: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->...
|<-1 s.-> | <- 1 s. -> |
(2 items) (2 items)
When to Use
- When implementing consistent rate limiting for API calls
- For controlling throughput to external services
- When processing needs to match a specific rate requirement
- For implementing SLA compliance
- When preventing system overload
- For simulating time-constrained processing
- When implementing quota-based systems
Description
The rate_limit_linear
function creates a rate-limited stream that processes
a specified number of elements within a given time interval. Unlike exponential
backoff, this maintains a constant rate of processing, making it suitable for
scenarios where consistent throughput is required.
Parameters:
- rate: Number of elements to process per interval
- interval_ms: Time interval in milliseconds
The function ensures that:
- Elements are processed at a constant rate
- Delays are evenly distributed across the interval
- Processing matches the specified throughput
- Rate limiting is precise and predictable
This is particularly useful for:
- API rate limiting compliance
- Resource utilization control
- Service level agreement implementation
- System load management
- Throughput optimization
- Quota enforcement
pub fn recover(
stream: Stream(Result(a, b)),
recover: fn(b) -> Stream(a),
) -> Stream(a)
Recovers from an error in a stream using the given function.
Alias for try_recover
.
pub fn sleep(stream: Stream(a), delay_ms: Int) -> Stream(a)
Adds a delay between each element in a stream.
Example
> from_range(1, 3)
|> sleep(1000) // 1 second delay
|> to_list()
// Returns [1, 2, 3] with 1 second delay between each element
Visual Representation
Input: [1]-->[2]-->[3]-->|
Output: [1]--1s-->[2]--1s-->[3]-->|
| | |
↓ ↓ ↓
delay delay delay
Where:
|
represents the end of the stream--1s-->
represents a 1 second delay- Elements flow from left to right with delays
When to Use
- When implementing rate limiting
- For throttling high-speed streams
- When simulating time-dependent processes
- For creating animated or time-based sequences
- When implementing backpressure mechanisms
- For testing timing-dependent operations
- When synchronizing with external time-based systems
Description
The sleep
function creates a new stream that introduces a fixed delay
between each element of the input stream. Every time an element is pulled
from the stream, the function waits for the specified number of milliseconds
before yielding the next element. This is useful for controlling the rate
of stream processing or simulating time-dependent sequences.
Key characteristics:
- Maintains original element order
- Adds consistent delays between elements
- Processes elements lazily
- Delay occurs before each element
- Suitable for infinite streams
pub fn take(stream: Stream(a), n: Int) -> Stream(a)
Takes the first n
elements from a stream.
Example
> from_range(1, 10)
|> take(5)
|> to_list()
[1, 2, 3, 4, 5]
Visual Representation
Input: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->[7]-->[8]-->|
| | | | | × × ×
v v v v v
Output: [1]-->[2]-->[3]-->[4]-->[5]-->|
Where:
|
represents the end of the stream×
indicates elements that are not taken- Elements before the take limit flow through unchanged
When to Use
- When you need to limit the number of elements from a potentially infinite stream
- For implementing pagination (taking the first n items)
- When sampling or previewing data from a large stream
- For testing with a subset of stream elements
- When implementing batched processing
- To prevent infinite processing of endless streams
Description
The take
function creates a new stream containing only the first n
elements
from the input stream. Once n
elements have been yielded, the resulting stream
terminates, even if more elements are available in the input stream. This is
particularly useful when working with infinite streams or when you need to limit
the number of elements processed.
The function:
- Preserves the order of elements
- Processes elements lazily
- Returns less than n elements if the input stream is shorter
- Returns an empty stream if n is 0 or negative
pub fn take_while(
stream: Stream(a),
pred: fn(a) -> Bool,
) -> Stream(a)
Takes elements from a stream as long as they satisfy a predicate.
Example
> from_range(1, 10)
|> take_while(fn(x) { x <= 5 })
|> to_list()
[1, 2, 3, 4, 5]
Visual Representation
Input: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->[7]-->[8]-->|
| | | | | × × ×
p(x) p(x) p(x) p(x) p(x) p(x)
✓ ✓ ✓ ✓ ✓ ×
v v v v v
Output: [1]-->[2]-->[3]-->[4]-->[5]-->|
Where:
|
represents the end of the streamp(x)
is the predicate function✓
indicates the predicate returned True×
indicates where the stream terminates (predicate returned False)
When to Use
- When you need to collect elements until a condition is no longer met
- For processing sequences until a terminating condition occurs
- When implementing range filters with dynamic end conditions
- For handling sorted data where you want elements until a threshold
- When processing time-series data until a specific event
- For implementing early termination based on element values
Description
The take_while
function creates a new stream that yields elements from the
input stream as long as they satisfy the given predicate function. Once an
element is encountered that fails the predicate, the stream terminates
immediately, ignoring any remaining elements. This is particularly useful
when working with sorted data or when you need to process elements until
a certain condition is met.
The function:
- Processes elements lazily
- Preserves the order of elements
- Stops immediately when the predicate returns False
- Returns an empty stream if the first element fails the predicate
- Never processes elements after the first failing predicate
pub fn tap(stream: Stream(a), f: fn(a) -> b) -> Stream(a)
Applies a function to each element of a stream for side effects.
Example:
repeat(1) |> tap(fn(x) { io.println(x) })
pub fn to_fold(
stream: Stream(a),
initial: b,
f: fn(b, a) -> b,
) -> b
Reduces a stream to a single value by applying a function to each element.
Example
> from_range(1, 5)
|> to_fold(0, fn(acc, x) { acc + x })
// Returns 15 (sum of 1 to 5)
Visual Representation
Input: [1]---->[2]---->[3]---->[4]---->[5]-->|
| | | | |
f(0,1) f(1,2) f(3,3) f(6,4) f(10,5)
| | | | |
v v v v v
Acc: 0 -> 1 -> 3 -> 6 -> 10 -> 15
Where:
|
represents the end of the streamf(acc,x)
shows each fold operation- Values flow from left to right, accumulating results
When to Use
- When you need to combine all stream elements into a single value
- For calculating aggregates (sum, product, etc.)
- When implementing custom collection operations
- For transforming streams into non-stream data structures
- When you need to maintain state while processing a stream
- For implementing reduction operations over streams
Description
The to_fold
function processes a stream by applying a folding function to
each element, maintaining an accumulator value throughout the operation. It:
- Takes an initial accumulator value
- Processes each stream element in order
- Updates the accumulator using the provided function
- Returns the final accumulator value
The folding function receives two arguments:
- The current accumulator value
- The current stream element
This is a terminal operation that consumes the entire stream and produces
a single result. It’s commonly used as the foundation for other stream
termination operations like to_list
, to_set
, etc.
pub fn to_list(stream: Stream(a)) -> List(a)
Collects all elements from a stream into a list.
Example
> from_range(1, 5)
|> to_list()
[1, 2, 3, 4, 5]
Visual Representation
Input Stream:
+---+ +---+ +---+ +---+ +---+
-->| 1 |--->| 2 |--->| 3 |--->| 4 |--->| 5 |-->|
+---+ +---+ +---+ +---+ +---+
| | | | |
v v v v v
[1] -> [1,2] -> [1,2,3] -> [1,2,3,4] -> [1,2,3,4,5]
Where:
|
represents the end of the stream- Each element is accumulated into the growing list
When to Use
- When you need to collect all stream elements into memory
- For processing finite streams where total size is manageable
- When you need random access to stream elements
- For converting lazy evaluation (streams) to eager evaluation (lists)
- When implementing terminal operations that need all elements at once
- For testing or debugging stream operations
Description
The to_list
function is a terminal operation that consumes the entire stream
and collects all elements into a list. It processes elements sequentially,
maintaining their original order. This operation requires enough memory to
hold all stream elements simultaneously, so it should be used carefully with
large or infinite streams.
Key characteristics:
- Preserves element order
- Consumes the entire stream
- Requires memory proportional to stream length
- Creates a new list containing all elements
- Terminal operation (ends stream processing)
- Suitable for finite streams
pub fn to_nil(stream: Stream(a)) -> Nil
Processes a stream completely, discarding all values and returning Nil.
Example
> from_range(1, 5)
|> tap(io.println)
|> to_nil
// Prints: 1, 2, 3, 4, 5
// Returns: Nil
Visual Representation
Input: [1]-->[2]-->[3]-->[4]-->[5]-->|
↓ ↓ ↓ ↓ ↓
process process process process process
↓ ↓ ↓ ↓ ↓
Output: Nil Nil Nil Nil Nil
Where:
|
represents the end of the stream- Each element is processed but discarded
- Only side effects (if any) remain
When to Use
- When you only care about side effects (logging, printing, etc.)
- For consuming a stream without keeping results
- When implementing fire-and-forget operations
- For triggering stream processing without collecting results
- When memory usage is critical and results aren’t needed
- For testing stream processing without result validation
Description
The to_nil
function is a terminal operation that processes an entire stream
but discards all values, returning Nil. It’s particularly useful when you’re
interested in the side effects of processing a stream (like printing or
logging) but don’t need the actual values. This operation is memory efficient
as it doesn’t accumulate any results.
Key characteristics:
- Processes all elements sequentially
- Discards all values
- Returns Nil regardless of input
- Preserves side effects from stream processing
- Terminal operation (ends stream processing)
- Memory efficient (no accumulation)
pub fn to_nil_error_terminated(
stream: Stream(Result(a, b)),
) -> Nil
Processes a stream of Results until encountering an Error, discarding all values and returning Nil.
Example
> [Ok(1), Ok(2), Error("oops"), Ok(3)]
|> from_list
|> to_nil_error_terminated
// Processes 1, 2, stops at Error("oops"), ignores 3
// Returns: Nil
Visual Representation
Input: [Ok(1)]-->[Ok(2)]-->[Error]-->[Ok(3)]-->|
| | | ×
↓ ↓ ↓
Nil Nil Nil
|
Output: Nil
Where:
|
represents the end of the stream×
represents values that are ignored after Error- Each Ok value is processed but discarded
- Processing stops at Error
When to Use
- When processing Result streams for side effects until first error
- For consuming streams with Error as termination signal
- When implementing fail-fast operations with side effects
- For processing streams where errors indicate critical failures
- When you need to execute side effects but don’t need values
- For testing error handling without result validation
- When implementing fault-tolerant processing pipelines
Description
The to_nil_error_terminated
function is a terminal operation that processes
a stream of Results until it encounters an Error value. It combines the behavior
of error_terminated
and to_nil
, processing each Ok value but discarding
the results. The operation stops immediately when an Error is encountered,
ignoring any subsequent elements.
Key characteristics:
- Processes Ok values sequentially until Error
- Discards all values
- Stops processing at first Error
- Ignores elements after Error
- Returns Nil regardless of input
- Memory efficient (no accumulation)
- Preserves side effects from stream processing
- Terminal operation (ends stream processing)
pub fn to_nil_none_terminated(stream: Stream(Option(a))) -> Nil
Processes a stream of Options until encountering None, discarding all values and returning Nil.
Example
[Some(1), Some(2), None, Some(3)]
|> from_list
|> to_nil_none_terminated
// Processes 1, 2, stops at None, ignores 3
// Returns: Nil
Visual Representation
Input: [Some(1)]-->[Some(2)]-->[None]-->[Some(3)]-->|
| | | ×
↓ ↓ ↓
Nil Nil Nil
|
Output: Nil
Where:
|
represents the end of the stream×
represents values that are ignored after None- Each Some value is processed but discarded
- Processing stops at None
When to Use
- When processing Option streams for side effects until termination
- For consuming streams with None as termination signal
- When implementing fire-and-forget operations with early termination
- For processing streams where None indicates completion
- When you need to execute side effects but don’t need values
- For testing Option stream processing without result validation
Description
The to_nil_none_terminated
function is a terminal operation that processes
a stream of Options until it encounters a None value. It combines the behavior
of none_terminated
and to_nil
, processing each Some value but discarding
the results. The operation stops immediately when None is encountered, ignoring
any subsequent elements.
Key characteristics:
- Processes Some values sequentially until None
- Discards all values
- Stops processing at first None
- Ignores elements after None
- Returns Nil regardless of input
- Memory efficient (no accumulation)
- Preserves side effects from stream processing
- Terminal operation (ends stream processing)
pub fn to_option(stream: Stream(a)) -> Option(a)
Collects the first element of a stream into an option.
Example:
repeat(1) |> take(5) |> to_option
pub fn to_set(stream: Stream(a)) -> Set(a)
Collects all elements from a stream into a set.
Example
> from_list([1, 2, 2, 3, 3, 3])
|> to_set()
|> set.to_list()
[1, 2, 3] // Duplicates removed
Visual Representation
Input Stream:
+---+ +---+ +---+ +---+ +---+ +---+
-->| 1 |--->| 2 |--->| 2 |--->| 3 |--->| 3 |--->| 3 |-->|
+---+ +---+ +---+ +---+ +---+ +---+
| | | | | |
v v v v v v
{1} -> {1,2} -> {1,2} -> {1,2,3} -> {1,2,3} -> {1,2,3}
Where:
|
represents the end of the stream- Each element is added to the set, duplicates are automatically removed
{...}
represents the growing set
When to Use
- When you need to collect unique elements from a stream
- For removing duplicates from a stream efficiently
- When order doesn’t matter but uniqueness does
- For implementing set operations on streams
- When building unique collections from potentially redundant data
- For counting unique elements in a stream
Description
The to_set
function is a terminal operation that consumes the entire stream
and collects all unique elements into a set. It automatically handles
deduplication as elements are added to the set. The resulting set provides
efficient membership testing and ensures each value appears only once.
Key characteristics:
- Removes duplicates automatically
- Unordered collection (set semantics)
- Consumes the entire stream
- Efficient membership testing
- Terminal operation (ends stream processing)
pub fn to_split(
stream: Stream(a),
pred: fn(a) -> Bool,
) -> #(Stream(a), Stream(a), Task(Nil))
Splits a stream into two streams based on a predicate function, returning a tuple containing both streams and a task that manages the split operation.
Example
> let #(evens, odds, task) =
> from_range(1, 6)
> |> to_split(fn(x) { x % 2 == 0 })
>
> let even_list = evens |> to_list() // [2, 4, 6]
> let odd_list = odds |> to_list() // [1, 3, 5]
> task.await(task)
Visual Representation
Input: [1]-->[2]-->[3]-->[4]-->[5]-->[6]-->|
| | | | | |
pred pred pred pred pred pred
| | | | | |
v v v v v v
Evens: ---->[2]---------->[4]---------->[6]-->|
Odds: [1]---------->[3]---------->[5]------->|
Where:
|
represents the end of the streampred
represents the predicate function evaluation- Elements flow to either evens or odds based on predicate
When to Use
- When you need to partition a stream into two separate streams
- For implementing parallel processing of different data categories
- When separating elements based on a condition for different handling
- For implementing filter-based routing of stream elements
- When you need to process different categories of data independently
- For implementing stream-based data classification
Description
The to_split
function divides an input stream into two separate streams based
on a predicate function. It returns a tuple containing:
- A stream of elements for which the predicate returns true
- A stream of elements for which the predicate returns false
- A task that manages the splitting operation
The function operates by:
- Creating two subjects to manage the split streams
- Asynchronously processing the input stream
- Routing each element to the appropriate subject based on the predicate
- Terminating both output streams when input is exhausted
The splitting operation is performed lazily and asynchronously, making it efficient for large streams. Both resulting streams can be processed independently and concurrently. The task ensures proper cleanup and termination of both output streams.
pub fn to_subject(
stream: Stream(a),
) -> #(Subject(Option(a)), Task(Nil))
Converts a stream into a process Subject and returns it along with a task that manages the conversion.
Example
> let #(subject, task) =
> from_range(1, 3)
> |> to_subject()
>
> process.receive(subject, 100) // -> Ok(Some(1))
> process.receive(subject, 100) // -> Ok(Some(2))
> process.receive(subject, 100) // -> Ok(Some(3))
> process.receive(subject, 100) // -> Ok(None)
> task.await(task)
Visual Representation
Stream to Subject conversion:
+---+ +---+ +---+
-->| 1 |--->| 2 |--->| 3 |-->| Stream
+---+ +---+ +---+
| | |
v v v
Subject: Some(1), Some(2), Some(3), None
| | | |
v v v v
Receive Receive Receive Receive Process
Where:
|
represents the end of the stream- Each element is sent to the subject as Some(value)
- Stream end is signaled with None
When to Use
- When converting from pull-based (Stream) to push-based (Subject) processing
- For bridging synchronous streams with asynchronous message passing
- When implementing pub/sub patterns with stream sources
- For distributing stream elements to multiple consumers
- When you need to buffer stream elements for later processing
- For implementing backpressure mechanisms
- When integrating streams with OTP process patterns
Description
The to_subject
function transforms a stream into a process Subject, which
allows for asynchronous message-based communication. It returns a tuple
containing:
- A Subject that will receive stream elements as Option(a) messages
- A Task that manages the conversion process
The function:
- Creates a new Subject for receiving stream elements
- Asynchronously processes the stream, sending each element as Some(value)
- Signals stream completion by sending None
- Manages the conversion process in a separate task
- Allows for multiple consumers to receive stream elements
The returned task ensures proper cleanup and should be awaited when the
stream processing is complete. Recipients can receive values from the
subject using process.receive
or similar functions.
pub fn try_recover(
stream: Stream(Result(a, b)),
recover: fn(b) -> Stream(a),
) -> Stream(a)
Transforms a stream of Results into a stream of values, using a recovery function for errors.
Example
from_list([Ok(1), Error("oops"), Ok(3)])
|> try_recover(fn(_) { from_pure(2) })
|> to_list()
// Returns [1, 2, 3]
Visual Representation
Input: [Ok(1)]---->[Err("oops")]---->[Ok(3)]-->|
| | |
v v v
[1]----> f("oops") ------> [3]-->|
|
v
[2]
Where:
|
represents the end of the streamf(err)
is the recovery function- Errors are replaced with recovery stream values
When to Use
- When handling fallible operations in streams
- For implementing retry logic in streaming operations
- When providing fallback values for failed operations
- For graceful error recovery in data processing pipelines
- When implementing fault-tolerant stream processing
- For transforming error cases into alternative valid values
Description
The try_recover
function transforms a stream of Results into a stream of
values by:
- Passing through successful values (
Ok
) unchanged - Applying a recovery function to error values (
Error
) - Flattening the recovery stream into the main stream
The recovery function takes the error value and returns a new stream, allowing for flexible error handling strategies including:
- Providing default values
- Retrying failed operations
- Computing alternative values
- Logging errors while continuing processing
The function processes elements lazily, only applying recovery when needed, making it efficient for large streams.
pub fn window(stream: Stream(a), size: Int) -> Stream(List(a))
Buffers a stream into overlapping windows of a specified size.
Example
> from_range(1, 5)
|> window(3)
|> to_list()
[[1, 2, 3], [2, 3, 4], [3, 4, 5]]
Visual Representation
Input: [1]-->[2]-->[3]-->[4]-->[5]-->|
| | | | |
| [1,2] [1,2,3] |
| | | | |
| | [2,3,4] |
| | | | |
| | [3,4,5] |
Output: [[1,2,3], [2,3,4], [3,4,5]]
Where:
|
represents the end of the stream- Each window overlaps with the next
- Windows slide by one element each time
When to Use
- For implementing sliding window operations
- When processing time series data with overlapping windows
- For calculating moving averages
- When implementing signal processing algorithms
- For pattern detection in sequential data
- When analyzing trends in streaming data
Description
The window
function creates a new stream where elements are grouped into
overlapping windows of the specified size. Each window contains the specified
number of consecutive elements, and windows overlap by size-1 elements. The
function is useful for operations that need to consider multiple consecutive
elements together.
Key characteristics:
- Creates overlapping windows of fixed size
- Windows slide by one element each time
- Preserves element order within windows
- Returns empty stream if size <= 0
- Processes elements lazily
- Memory usage proportional to window size
pub fn zip(left: Stream(a), right: Stream(b)) -> Stream(#(a, b))
Zips two streams together into a stream of tuples.
Example
> from_range(1, 3)
|> zip(from_range(10, 12))
|> to_list()
[#(1, 10), #(2, 11), #(3, 12)]
Visual Representation
Stream 1: [1]---->[2]---->[3]-->|
| | |
v v v
#(1,10) #(2,11) #(3,12)
^ ^ ^
| | |
Stream 2: [10]--->[11]-->[12]-->|
Where:
|
represents the end of the stream- Vertical arrows show how elements are paired
- Each pair becomes a tuple in the output stream
When to Use
- When you need to combine elements from two streams pairwise
- For implementing parallel processing with synchronized streams
- When creating coordinate pairs from separate x and y streams
- For matching corresponding elements from different data sources
- When implementing stream operations that need element-wise combination
- For creating associations between related streams
Description
The zip
function combines two streams by pairing their elements together into
tuples. It processes both streams in parallel, creating a new stream where each
element is a tuple containing corresponding elements from both input streams.
The resulting stream terminates when either input stream ends, making it safe
to use with streams of different lengths.
Key characteristics:
- Preserves ordering of elements from both streams
- Processes elements lazily (on-demand)
- Terminates when either stream ends
- Creates tuples of corresponding elements
- Maintains synchronization between streams
pub fn zip_all(
left: Stream(a),
right: Stream(b),
) -> Stream(Option(#(Option(a), Option(b))))
Zips two streams together, continuing until both streams are exhausted and including
None
values for elements when one stream ends before the other.
Example
> from_range(1, 3)
|> zip_all(from_range(10, 11))
|> to_list()
[
Some(#(Some(1), Some(10))),
Some(#(Some(2), Some(11))),
Some(#(Some(3), None))
]
Visual Representation
Stream 1: [1]-------->[2]-------->[3]------->|
| | |
v v v
#(S(1),S(10)) #(S(2),S(11)) #(S(3),N)
^ ^
| |
Stream 2: [10]------->[11]------->|
Where: S = Some, N = None
When to Use
- When you need to process two streams of different lengths together
- For implementing full outer joins between streams
- When handling missing or incomplete data between streams
- For synchronizing streams with potential gaps
- When you need to know exactly where and how streams diverge
- For implementing fault-tolerant stream processing
Description
The zip_all
function combines two streams by pairing their elements, but unlike
regular zip
which stops at the shortest stream, this continues until both
streams are exhausted. When one stream ends before the other, the function
continues producing pairs with None
for the exhausted stream’s side.
Key characteristics:
- Produces values until both streams are exhausted
- Wraps each element pair in
Some
to distinguish from stream end - Uses
None
to indicate when a stream has ended - Maintains perfect tracking of stream alignment
- Preserves all elements from both streams
- Processes elements lazily (on-demand)
pub fn zip_all_with(
left: Stream(a),
right: Stream(b),
f: fn(Option(a), Option(b)) -> c,
) -> Stream(c)
Zips two streams using a function to combine values, continuing until both streams are exhausted.
Example
> from_range(1, 3)
|> zip_all_with(
from_range(10, 11),
fn(x, y) {
case #(x, y) {
#(Some(a), Some(b)) -> a + b
#(Some(a), None) -> a
#(None, Some(b)) -> b
#(None, None) -> 0 // Never reached in this example
}
}
)
|> to_list()
// Returns [11, 13, 3]
Visual Representation
Stream 1: [1]-------->[2]-------->[3]------->|
| | |
v v v
f(S(1),S(10)) f(S(2),S(11)) f(S(3),N)
↓ ↓ ↓
[11]------->[13]-------->[3]------->|
^ ^
| |
Stream 2: [10]------->[11]------->|
Where: S = Some, N = None, f = combining function
When to Use
- When combining streams of different lengths with custom logic
- For implementing full outer joins with transformation
- When handling missing data with specific fallback logic
- For stream synchronization with custom element combination
- When implementing fault-tolerant stream processing with data transformation
- For creating derived streams that handle gaps in source streams
Description
The zip_all_with
function combines two streams by applying a custom function
to pairs of elements, continuing until both streams are exhausted. Unlike
regular zip_with
which stops at the shortest stream, this continues
processing and provides None
values for exhausted streams.
Key characteristics:
- Processes until both streams are complete
- Provides
None
for elements after a stream ends - Allows custom handling of present/missing values
- Maintains stream alignment with transformation
- Processes elements lazily (on-demand)
- Preserves streaming semantics while allowing value combination
pub fn zip_with(
left: Stream(a),
right: Stream(b),
f: fn(a, b) -> c,
) -> Stream(c)
Zips two streams together using a function to combine their elements.
Example
> from_range(1, 3)
|> zip_with(
from_range(10, 12),
fn(x, y) { x + y }
)
|> to_list()
[11, 13, 15]
Visual Representation
Stream 1: [1]---->[2]---->[3]-->|
| | |
v v v
f(1,10) f(2,11) f(3,12)
^ ^ ^
| | |
Stream 2: [10]--->[11]-->[12]-->|
Result: [11]--->[13]-->[15]-->|
Where:
|
represents the end of the stream- Vertical arrows show how elements are paired
f(x,y)
represents the combining function
When to Use
- When you need to combine elements from two streams using custom logic
- For implementing mathematical operations between parallel streams
- When transforming coordinate pairs with a specific formula
- For combining time series data with a custom aggregation function
- When implementing stream operations that need element-wise computation
- For creating derived streams based on multiple input streams
Description
The zip_with
function combines two streams by applying a custom function to
corresponding pairs of elements. It processes both streams in parallel,
creating a new stream where each element is the result of applying the
provided function to elements from both input streams. The resulting stream
terminates when either input stream ends.
Key characteristics:
- Processes elements lazily (on-demand)
- Applies the combining function only when elements are pulled
- Terminates when either stream ends
- Maintains synchronization between streams
- Allows custom transformation of paired elements
- Preserves the streaming nature of inputs