Elixir v1.4.4 Stream View Source

Module for creating and composing streams.

Streams are composable, lazy enumerables. Any enumerable that generates items one by one during enumeration is called a stream. For example, Elixir’s Range is a stream:

iex> range = 1..5
1..5
iex> Enum.map range, &(&1 * 2)
[2, 4, 6, 8, 10]

In the example above, as we mapped over the range, the elements being enumerated were created one by one, during enumeration. The Stream module allows us to map the range, without triggering its enumeration:

iex> range = 1..3
iex> stream = Stream.map(range, &(&1 * 2))
iex> Enum.map(stream, &(&1 + 1))
[3, 5, 7]

Notice we started with a range and then we created a stream that is meant to multiply each item in the range by 2. At this point, no computation was done. Only when Enum.map/2 is called we actually enumerate over each item in the range, multiplying it by 2 and adding 1. We say the functions in Stream are lazy and the functions in Enum are eager.

Due to their laziness, streams are useful when working with large (or even infinite) collections. When chaining many operations with Enum, intermediate lists are created, while Stream creates a recipe of computations that are executed at a later moment. Let’s see another example:

1..3
|> Enum.map(&IO.inspect(&1))
|> Enum.map(&(&1 * 2))
|> Enum.map(&IO.inspect(&1))
1
2
3
2
4
6
#=> [2, 4, 6]

Notice that we first printed each item in the list, then multiplied each element by 2 and finally printed each new value. In this example, the list was enumerated three times. Let’s see an example with streams:

stream = 1..3
|> Stream.map(&IO.inspect(&1))
|> Stream.map(&(&1 * 2))
|> Stream.map(&IO.inspect(&1))
Enum.to_list(stream)
1
2
2
4
3
6
#=> [2, 4, 6]

Although the end result is the same, the order in which the items were printed changed! With streams, we print the first item and then print its double. In this example, the list was enumerated just once!

That’s what we meant when we said earlier that streams are composable, lazy enumerables. Notice we could call Stream.map/2 multiple times, effectively composing the streams and keeping them lazy. The computations are only performed when you call a function from the Enum module.

Creating Streams

There are many functions in Elixir’s standard library that return streams, some examples are:

This module also provides many convenience functions for creating streams, like Stream.cycle/1, Stream.unfold/2, Stream.resource/3 and more.

Note the functions in this module are guaranteed to return enumerables. Since enumerables can have different shapes (structs, anonymous functions, and so on), the functions in this module may return any of those shapes and that this may change at any time. For example, a function that today returns an anonymous function may return a struct in future releases.

Link to this section Summary

Functions

Shortcut to chunk(enum, n, n)

Streams the enumerable in chunks, containing n items each, where each new chunk starts step elements into the enumerable

Chunks the enum by buffering elements for which fun returns the same value and only emit them when fun returns a new value or the enum finishes

Creates a stream that enumerates each enumerable in an enumerable

Creates a stream that enumerates the first argument, followed by the second

Creates a stream that cycles through the given enumerable, infinitely

Creates a stream that only emits elements if they are different from the last emitted element

Creates a stream that only emits elements if the result of calling fun on the element is different from the (stored) result of calling fun on the last emitted element

Lazily drops the next n items from the enumerable

Creates a stream that drops every nth item from the enumerable

Lazily drops elements of the enumerable while the given function returns true

Executes the given function for each item

Creates a stream that filters elements according to the given function on enumeration

Creates a stream that filters and then maps elements according to given functions

Creates a stream that will apply the given function on enumeration and flatten the result, but only one level deep

Creates a stream that emits a value after the given period n in milliseconds

Injects the stream values into the given collectable as a side-effect

Emits a sequence of values, starting with start_value. Successive values are generated by calling next_fun on the previous value

Creates a stream that will apply the given function on enumeration

Creates a stream that will apply the given function on every nth item from the enumerable

Creates a stream that will reject elements according to the given function on enumeration

Returns a stream generated by calling generator_fun repeatedly

Emits a sequence of values for the given resource

Runs the given stream

Creates a stream that applies the given function to each element, emits the result and uses the same result as the accumulator for the next computation

Creates a stream that applies the given function to each element, emits the result and uses the same result as the accumulator for the next computation. Uses the given acc as the starting value

Lazily takes the next count items from the enumerable and stops enumeration

Creates a stream that takes every nth item from the enumerable

Lazily takes elements of the enumerable while the given function returns true

Creates a stream that emits a single value after n milliseconds

Transforms an existing stream

Transforms an existing stream with function-based start and finish

Emits a sequence of values for the given accumulator

Creates a stream that only emits elements if they are unique

Creates a stream that only emits elements if they are unique, by removing the elements for which function fun returned duplicate items

Creates a stream where each item in the enumerable will be wrapped in a tuple alongside its index

Zips corresponding elements from a collection of enumerables into one stream of tuples

Zips two collections together, lazily

Link to this section Types

Link to this type default() View Source
default() :: any
Link to this type element() View Source
element() :: any
Link to this type index() View Source
index() :: non_neg_integer

Link to this section Functions

Shortcut to chunk(enum, n, n).

Link to this function chunk(enum, n, step, leftover \\ nil) View Source
chunk(Enumerable.t, pos_integer, pos_integer, Enumerable.t | nil) :: Enumerable.t

Streams the enumerable in chunks, containing n items each, where each new chunk starts step elements into the enumerable.

step is optional and, if not passed, defaults to n, i.e. chunks do not overlap. If the final chunk does not have n elements to fill the chunk, elements are taken as necessary from leftover if it was passed. If leftover is passed and does not have enough elements to fill the chunk, then the chunk is returned anyway with less than n elements. If leftover is not passed at all or is nil, then the partial chunk is discarded from the result.

Examples

iex> Stream.chunk([1, 2, 3, 4, 5, 6], 2) |> Enum.to_list
[[1, 2], [3, 4], [5, 6]]

iex> Stream.chunk([1, 2, 3, 4, 5, 6], 3, 2) |> Enum.to_list
[[1, 2, 3], [3, 4, 5]]

iex> Stream.chunk([1, 2, 3, 4, 5, 6], 3, 2, [7]) |> Enum.to_list
[[1, 2, 3], [3, 4, 5], [5, 6, 7]]

iex> Stream.chunk([1, 2, 3, 4, 5, 6], 3, 3, []) |> Enum.to_list
[[1, 2, 3], [4, 5, 6]]
Link to this function chunk_by(enum, fun) View Source
chunk_by(Enumerable.t, (element -> any)) :: Enumerable.t

Chunks the enum by buffering elements for which fun returns the same value and only emit them when fun returns a new value or the enum finishes.

Examples

iex> stream = Stream.chunk_by([1, 2, 2, 3, 4, 4, 6, 7, 7], &(rem(&1, 2) == 1))
iex> Enum.to_list(stream)
[[1], [2, 2], [3], [4, 4, 6], [7, 7]]

Creates a stream that enumerates each enumerable in an enumerable.

Examples

iex> stream = Stream.concat([1..3, 4..6, 7..9])
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5, 6, 7, 8, 9]

Creates a stream that enumerates the first argument, followed by the second.

Examples

iex> stream = Stream.concat(1..3, 4..6)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5, 6]

iex> stream1 = Stream.cycle([1, 2, 3])
iex> stream2 = Stream.cycle([4, 5, 6])
iex> stream = Stream.concat(stream1, stream2)
iex> Enum.take(stream, 6)
[1, 2, 3, 1, 2, 3]

Creates a stream that cycles through the given enumerable, infinitely.

Examples

iex> stream = Stream.cycle([1, 2, 3])
iex> Enum.take(stream, 5)
[1, 2, 3, 1, 2]

Creates a stream that only emits elements if they are different from the last emitted element.

This function only ever needs to store the last emitted element.

Elements are compared using ===.

Examples

iex> Stream.dedup([1, 2, 3, 3, 2, 1]) |> Enum.to_list
[1, 2, 3, 2, 1]
Link to this function dedup_by(enum, fun) View Source
dedup_by(Enumerable.t, (element -> term)) :: Enumerable.t

Creates a stream that only emits elements if the result of calling fun on the element is different from the (stored) result of calling fun on the last emitted element.

Examples

iex> Stream.dedup_by([{1, :x}, {2, :y}, {2, :z}, {1, :x}], fn {x, _} -> x end) |> Enum.to_list
[{1, :x}, {2, :y}, {1, :x}]
Link to this function drop(enum, n) View Source
drop(Enumerable.t, non_neg_integer) :: Enumerable.t

Lazily drops the next n items from the enumerable.

If a negative n is given, it will drop the last n items from the collection. Note that the mechanism by which this is implemented will delay the emission of any item until n additional items have been emitted by the enum.

Examples

iex> stream = Stream.drop(1..10, 5)
iex> Enum.to_list(stream)
[6, 7, 8, 9, 10]

iex> stream = Stream.drop(1..10, -5)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
Link to this function drop_every(enum, nth) View Source
drop_every(Enumerable.t, non_neg_integer) :: Enumerable.t

Creates a stream that drops every nth item from the enumerable.

The first item is always dropped, unless nth is 0.

nth must be a non-negative integer.

Examples

iex> stream = Stream.drop_every(1..10, 2)
iex> Enum.to_list(stream)
[2, 4, 6, 8, 10]

iex> stream = Stream.drop_every(1..1000, 1)
iex> Enum.to_list(stream)
[]

iex> stream = Stream.drop_every([1, 2, 3, 4, 5], 0)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
Link to this function drop_while(enum, fun) View Source
drop_while(Enumerable.t, (element -> as_boolean(term))) :: Enumerable.t

Lazily drops elements of the enumerable while the given function returns true.

Examples

iex> stream = Stream.drop_while(1..10, &(&1 <= 5))
iex> Enum.to_list(stream)
[6, 7, 8, 9, 10]

Executes the given function for each item.

Useful for adding side effects (like printing) to a stream.

Examples

iex> stream = Stream.each([1, 2, 3], fn(x) -> send self(), x end)
iex> Enum.to_list(stream)
iex> receive do: (x when is_integer(x) -> x)
1
iex> receive do: (x when is_integer(x) -> x)
2
iex> receive do: (x when is_integer(x) -> x)
3
Link to this function filter(enum, fun) View Source
filter(Enumerable.t, (element -> as_boolean(term))) :: Enumerable.t

Creates a stream that filters elements according to the given function on enumeration.

Examples

iex> stream = Stream.filter([1, 2, 3], fn(x) -> rem(x, 2) == 0 end)
iex> Enum.to_list(stream)
[2]
Link to this function filter_map(enum, filter, mapper) View Source
filter_map(Enumerable.t, (element -> as_boolean(term)), (element -> any)) :: Enumerable.t

Creates a stream that filters and then maps elements according to given functions.

Exists for symmetry with Enum.filter_map/3.

Examples

iex> stream = Stream.filter_map(1..6, fn(x) -> rem(x, 2) == 0 end, &(&1 * 2))
iex> Enum.to_list(stream)
[4, 8, 12]

Creates a stream that will apply the given function on enumeration and flatten the result, but only one level deep.

Examples

iex> stream = Stream.flat_map([1, 2, 3], fn(x) -> [x, x * 2] end)
iex> Enum.to_list(stream)
[1, 2, 2, 4, 3, 6]

iex> stream = Stream.flat_map([1, 2, 3], fn(x) -> [[x]] end)
iex> Enum.to_list(stream)
[[1], [2], [3]]
Link to this function interval(n) View Source
interval(non_neg_integer) :: Enumerable.t

Creates a stream that emits a value after the given period n in milliseconds.

The values emitted are an increasing counter starting at 0. This operation will block the caller by the given interval every time a new item is streamed.

Do not use this function to generate a sequence of numbers. If blocking the caller process is not necessary, use Stream.iterate(0, & &1 + 1) instead.

Examples

iex> Stream.interval(10) |> Enum.take(10)
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Link to this function into(enum, collectable, transform \\ fn x -> x end) View Source
into(Enumerable.t, Collectable.t, (term -> term)) :: Enumerable.t

Injects the stream values into the given collectable as a side-effect.

This function is often used with run/1 since any evaluation is delayed until the stream is executed. See run/1 for an example.

Link to this function iterate(start_value, next_fun) View Source
iterate(element, (element -> element)) :: Enumerable.t

Emits a sequence of values, starting with start_value. Successive values are generated by calling next_fun on the previous value.

Examples

iex> Stream.iterate(0, &(&1+1)) |> Enum.take(5)
[0, 1, 2, 3, 4]

Creates a stream that will apply the given function on enumeration.

Examples

iex> stream = Stream.map([1, 2, 3], fn(x) -> x * 2 end)
iex> Enum.to_list(stream)
[2, 4, 6]
Link to this function map_every(enum, nth, fun) View Source
map_every(Enumerable.t, non_neg_integer, (element -> any)) :: Enumerable.t

Creates a stream that will apply the given function on every nth item from the enumerable.

The first item is always passed to the given function.

nth must be a non-negative integer.

Examples

iex> stream = Stream.map_every(1..10, 2, fn(x) -> x * 2 end)
iex> Enum.to_list(stream)
[2, 2, 6, 4, 10, 6, 14, 8, 18, 10]

iex> stream = Stream.map_every([1, 2, 3, 4, 5], 1, fn(x) -> x * 2 end)
iex> Enum.to_list(stream)
[2, 4, 6, 8, 10]

iex> stream = Stream.map_every(1..5, 0, fn(x) -> x * 2 end)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
Link to this function reject(enum, fun) View Source
reject(Enumerable.t, (element -> as_boolean(term))) :: Enumerable.t

Creates a stream that will reject elements according to the given function on enumeration.

Examples

iex> stream = Stream.reject([1, 2, 3], fn(x) -> rem(x, 2) == 0 end)
iex> Enum.to_list(stream)
[1, 3]
Link to this function repeatedly(generator_fun) View Source
repeatedly((() -> element)) :: Enumerable.t

Returns a stream generated by calling generator_fun repeatedly.

Examples

# Although not necessary, let's seed the random algorithm
iex> :rand.seed(:exsplus, {1, 2, 3})
iex> Stream.repeatedly(&:rand.uniform/0) |> Enum.take(3)
[0.40502929729990744, 0.45336720247823126, 0.04094511692041057]
Link to this function resource(start_fun, next_fun, after_fun) View Source
resource((() -> acc), (acc -> {[element], acc} | {:halt, acc}), (acc -> term)) :: Enumerable.t

Emits a sequence of values for the given resource.

Similar to transform/3 but the initial accumulated value is computed lazily via start_fun and executes an after_fun at the end of enumeration (both in cases of success and failure).

Successive values are generated by calling next_fun with the previous accumulator (the initial value being the result returned by start_fun) and it must return a tuple containing a list of items to be emitted and the next accumulator. The enumeration finishes if it returns {:halt, acc}.

As the name says, this function is useful to stream values from resources.

Examples

Stream.resource(fn -> File.open!("sample") end,
                fn file ->
                  case IO.read(file, :line) do
                    data when is_binary(data) -> {[data], file}
                    _ -> {:halt, file}
                  end
                end,
                fn file -> File.close(file) end)

Runs the given stream.

This is useful when a stream needs to be run, for side effects, and there is no interest in its return result.

Examples

Open up a file, replace all # by % and stream to another file without loading the whole file in memory:

stream = File.stream!("code")
|> Stream.map(&String.replace(&1, "#", "%"))
|> Stream.into(File.stream!("new"))
|> Stream.run

No computation will be done until we call one of the Enum functions or Stream.run/1.

Creates a stream that applies the given function to each element, emits the result and uses the same result as the accumulator for the next computation.

Examples

iex> stream = Stream.scan(1..5, &(&1 + &2))
iex> Enum.to_list(stream)
[1, 3, 6, 10, 15]

Creates a stream that applies the given function to each element, emits the result and uses the same result as the accumulator for the next computation. Uses the given acc as the starting value.

Examples

iex> stream = Stream.scan(1..5, 0, &(&1 + &2))
iex> Enum.to_list(stream)
[1, 3, 6, 10, 15]

Lazily takes the next count items from the enumerable and stops enumeration.

If a negative count is given, the last count values will be taken. For such, the collection is fully enumerated keeping up to 2 * count elements in memory. Once the end of the collection is reached, the last count elements will be executed. Therefore, using a negative count on an infinite collection will never return.

Examples

iex> stream = Stream.take(1..100, 5)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]

iex> stream = Stream.take(1..100, -5)
iex> Enum.to_list(stream)
[96, 97, 98, 99, 100]

iex> stream = Stream.cycle([1, 2, 3]) |> Stream.take(5)
iex> Enum.to_list(stream)
[1, 2, 3, 1, 2]
Link to this function take_every(enum, nth) View Source
take_every(Enumerable.t, non_neg_integer) :: Enumerable.t

Creates a stream that takes every nth item from the enumerable.

The first item is always included, unless nth is 0.

nth must be a non-negative integer.

Examples

iex> stream = Stream.take_every(1..10, 2)
iex> Enum.to_list(stream)
[1, 3, 5, 7, 9]

iex> stream = Stream.take_every([1, 2, 3, 4, 5], 1)
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]

iex> stream = Stream.take_every(1..1000, 0)
iex> Enum.to_list(stream)
[]
Link to this function take_while(enum, fun) View Source
take_while(Enumerable.t, (element -> as_boolean(term))) :: Enumerable.t

Lazily takes elements of the enumerable while the given function returns true.

Examples

iex> stream = Stream.take_while(1..100, &(&1 <= 5))
iex> Enum.to_list(stream)
[1, 2, 3, 4, 5]
Link to this function timer(n) View Source
timer(non_neg_integer) :: Enumerable.t

Creates a stream that emits a single value after n milliseconds.

The value emitted is 0. This operation will block the caller by the given time until the item is streamed.

Examples

iex> Stream.timer(10) |> Enum.to_list
[0]
Link to this function transform(enum, acc, reducer) View Source
transform(Enumerable.t, acc, fun) :: Enumerable.t when fun: (element, acc -> {Enumerable.t, acc} | {:halt, acc}), acc: any

Transforms an existing stream.

It expects an accumulator and a function that receives each stream item and an accumulator, and must return a tuple containing a new stream (often a list) with the new accumulator or a tuple with :halt as first element and the accumulator as second.

Note: this function is similar to Enum.flat_map_reduce/3 except the latter returns both the flat list and accumulator, while this one returns only the stream.

Examples

Stream.transform/3 is useful as it can be used as the basis to implement many of the functions defined in this module. For example, we can implement Stream.take(enum, n) as follows:

iex> enum = 1..100
iex> n = 3
iex> stream = Stream.transform(enum, 0, fn i, acc ->
...>   if acc < n, do: {[i], acc + 1}, else: {:halt, acc}
...> end)
iex> Enum.to_list(stream)
[1, 2, 3]
Link to this function transform(enum, start_fun, reducer, after_fun) View Source
transform(Enumerable.t, (() -> acc), fun, (acc -> term)) :: Enumerable.t when fun: (element, acc -> {Enumerable.t, acc} | {:halt, acc}), acc: any

Transforms an existing stream with function-based start and finish.

The accumulator is only calculated when transformation starts. It also allows an after function to be given which is invoked when the stream halts or completes.

This function can be seen as a combination of Stream.resource/3 with Stream.transform/3.

Link to this function unfold(next_acc, next_fun) View Source
unfold(acc, (acc -> {element, acc} | nil)) :: Enumerable.t

Emits a sequence of values for the given accumulator.

Successive values are generated by calling next_fun with the previous accumulator and it must return a tuple with the current value and next accumulator. The enumeration finishes if it returns nil.

Examples

iex> Stream.unfold(5, fn 0 -> nil; n -> {n, n-1} end) |> Enum.to_list()
[5, 4, 3, 2, 1]

Creates a stream that only emits elements if they are unique.

Keep in mind that, in order to know if an element is unique or not, this function needs to store all unique values emitted by the stream. Therefore, if the stream is infinite, the number of items stored will grow infinitely, never being garbage collected.

Examples

iex> Stream.uniq([1, 2, 3, 3, 2, 1]) |> Enum.to_list
[1, 2, 3]

Creates a stream that only emits elements if they are unique, by removing the elements for which function fun returned duplicate items.

The function fun maps every element to a term which is used to determine if two elements are duplicates.

Keep in mind that, in order to know if an element is unique or not, this function needs to store all unique values emitted by the stream. Therefore, if the stream is infinite, the number of items stored will grow infinitely, never being garbage collected.

Example

iex> Stream.uniq_by([{1, :x}, {2, :y}, {1, :z}], fn {x, _} -> x end) |> Enum.to_list
[{1, :x}, {2, :y}]

iex> Stream.uniq_by([a: {:tea, 2}, b: {:tea, 2}, c: {:coffee, 1}], fn {_, y} -> y end) |> Enum.to_list
[a: {:tea, 2}, c: {:coffee, 1}]
Link to this function with_index(enum, offset \\ 0) View Source
with_index(Enumerable.t, integer) :: Enumerable.t

Creates a stream where each item in the enumerable will be wrapped in a tuple alongside its index.

If an offset is given, we will index from the given offset instead of from zero.

Examples

iex> stream = Stream.with_index([1, 2, 3])
iex> Enum.to_list(stream)
[{1, 0}, {2, 1}, {3, 2}]

iex> stream = Stream.with_index([1, 2, 3], 3)
iex> Enum.to_list(stream)
[{1, 3}, {2, 4}, {3, 5}]

Zips corresponding elements from a collection of enumerables into one stream of tuples.

The zipping finishes as soon as any enumerable completes.

Examples

iex> concat = Stream.concat(1..3, 4..6)
iex> cycle = Stream.cycle(["foo", "bar", "baz"])
iex> Stream.zip([concat, [:a, :b, :c], cycle]) |> Enum.to_list
[{1, :a, "foo"}, {2, :b, "bar"}, {3, :c, "baz"}]

Zips two collections together, lazily.

The zipping finishes as soon as any enumerable completes.

Examples

iex> concat = Stream.concat(1..3, 4..6)
iex> cycle  = Stream.cycle([:a, :b, :c])
iex> Stream.zip(concat, cycle) |> Enum.to_list
[{1, :a}, {2, :b}, {3, :c}, {4, :a}, {5, :b}, {6, :c}]