Skuld.Effects.Brook (skuld v0.2.3)
View SourceHigh-level streaming API built on channels with transparent chunking.
Stream provides combinators for building streaming pipelines with:
- Backpressure via bounded channels
- Automatic error propagation
- Optional concurrency for transformations
- Integration with Skuld effects (batching works!)
- Transparent chunking for efficiency
Transparent Chunking
Internally, streams operate on chunks of values rather than individual values.
This is transparent to users - you write operations like map and filter
that operate on individual values, and the library handles chunking automatically.
Chunking dramatically reduces synchronization overhead by batching values together, reducing the number of fiber spawns and channel operations.
Basic Usage
comp do
# Create a stream from an enumerable
source <- Brook.from_enum(1..100)
# Transform with optional concurrency
mapped <- Brook.map(source, fn x -> x * 2 end, concurrency: 4)
# Filter
filtered <- Brook.filter(mapped, fn x -> rem(x, 4) == 0 end)
# Collect results
Brook.to_list(filtered)
end
|> Channel.with_handler()
|> FiberPool.with_handler()
|> FiberPool.run()Error Propagation
Errors automatically flow downstream through channels. If an error occurs while processing any value in a chunk, the stream is immediately errored:
comp do
source <- Brook.from_function(fn ->
case fetch_data() do
{:ok, items} -> {:items, items}
{:error, reason} -> {:error, reason}
end
end)
# If source errors, map sees {:error, reason} and propagates it
mapped <- Brook.map(source, &process/1)
# Final result is :ok or {:error, reason}
Brook.run(mapped, &sink/1)
end
Summary
Functions
Execute a function for each item in the stream.
Filter items in the stream.
Create a stream from an enumerable.
Create a stream from a producer function.
Transform each item in the stream.
Run a stream to completion, applying a consumer function to each item.
Collect all items from a stream into a list.
Functions
@spec each(Skuld.Effects.Channel.Handle.t(), (term() -> any())) :: Skuld.Comp.Types.computation()
Execute a function for each item in the stream.
Returns :ok when the stream completes successfully, or {:error, reason}
if an error occurred.
Example
comp do
source <- Brook.from_enum(1..10)
Brook.each(source, fn x -> IO.puts("Got: #{x}") end)
end
@spec filter(Skuld.Effects.Channel.Handle.t(), (term() -> boolean()), keyword()) :: Skuld.Comp.Types.computation()
Filter items in the stream.
Only items for which the predicate returns true pass through. Filtering happens within chunks - chunks may shrink but are not rechunked.
Options
:buffer- Output channel capacity in chunks (default: 10)
Example
comp do
source <- Brook.from_enum(1..20)
evens <- Brook.filter(source, fn x -> rem(x, 2) == 0 end)
Brook.to_list(evens)
end
@spec from_enum( Enumerable.t(), keyword() ) :: Skuld.Comp.Types.computation()
Create a stream from an enumerable.
Spawns a producer fiber that puts chunks of items into the output channel, then closes the channel when exhausted.
Options
:buffer- Output channel capacity in chunks (default: 10):chunk_size- Number of items per chunk (default: 100)
Example
comp do
source <- Brook.from_enum(1..100)
Brook.to_list(source)
end
@spec from_function( (-> {:item, term()} | {:items, [term()]} | :done | {:error, term()}), keyword() ) :: Skuld.Comp.Types.computation()
Create a stream from a producer function.
The producer function is called repeatedly until it signals completion. It should return:
{:item, value}- emit a single item{:items, [values]}- emit multiple items (will be chunked):done- close the channel normally{:error, reason}- signal error to consumers
Options
:buffer- Output channel capacity in chunks (default: 10):chunk_size- Number of items per chunk (default: 100)
Example
comp do
counter = Agent.start_link(fn -> 0 end)
source <- Brook.from_function(fn ->
n = Agent.get_and_update(counter, fn n -> {n, n + 1} end)
if n < 10, do: {:item, n}, else: :done
end)
Brook.to_list(source)
end
@spec map( Skuld.Effects.Channel.Handle.t(), (term() -> term() | Skuld.Comp.Types.computation()), keyword() ) :: Skuld.Comp.Types.computation()
Transform each item in the stream.
Spawns worker fiber(s) that read chunks from input, apply the transform function to each item in the chunk, and write result chunks to output.
The transform function can be a pure function or return a computation. If the transform errors on any item in a chunk, the stream is errored.
Options
:concurrency- Maximum concurrent transforms (default: 1). Due to cooperative scheduling,concurrency: 1has a floor of 2 concurrent transforms. Values >= 2 behave as specified.:buffer- Output channel capacity in chunks (default: 10)
Example
comp do
source <- Brook.from_enum(1..10)
doubled <- Brook.map(source, fn x -> x * 2 end)
Brook.to_list(doubled)
endWith Effects
Transform functions can use effects, enabling batching:
comp do
source <- Brook.from_enum(user_ids)
users <- Brook.map(source, fn id -> DB.fetch(User, id) end, concurrency: 10)
Brook.to_list(users)
end
|> DB.with_executors()
@spec run(Skuld.Effects.Channel.Handle.t(), (term() -> term() | Skuld.Comp.Types.computation())) :: Skuld.Comp.Types.computation()
Run a stream to completion, applying a consumer function to each item.
Similar to each/2 but the consumer function can return a computation.
Returns :ok on success or {:error, reason} on failure.
Example
comp do
source <- Brook.from_enum(records)
Brook.run(source, fn record -> DB.insert(record) end)
end
@spec to_list(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()
Collect all items from a stream into a list.
Returns the list on success, or {:error, reason} on failure.
Example
comp do
source <- Brook.from_enum(1..10)
mapped <- Brook.map(source, fn x -> x * 2 end)
Brook.to_list(mapped)
end