Skuld.Effects.Brook (skuld v0.2.3)

View Source

High-level streaming API built on channels with transparent chunking.

Stream provides combinators for building streaming pipelines with:

  • Backpressure via bounded channels
  • Automatic error propagation
  • Optional concurrency for transformations
  • Integration with Skuld effects (batching works!)
  • Transparent chunking for efficiency

Transparent Chunking

Internally, streams operate on chunks of values rather than individual values. This is transparent to users - you write operations like map and filter that operate on individual values, and the library handles chunking automatically.

Chunking dramatically reduces synchronization overhead by batching values together, reducing the number of fiber spawns and channel operations.

Basic Usage

comp do
  # Create a stream from an enumerable
  source <- Brook.from_enum(1..100)

  # Transform with optional concurrency
  mapped <- Brook.map(source, fn x -> x * 2 end, concurrency: 4)

  # Filter
  filtered <- Brook.filter(mapped, fn x -> rem(x, 4) == 0 end)

  # Collect results
  Brook.to_list(filtered)
end
|> Channel.with_handler()
|> FiberPool.with_handler()
|> FiberPool.run()

Error Propagation

Errors automatically flow downstream through channels. If an error occurs while processing any value in a chunk, the stream is immediately errored:

comp do
  source <- Brook.from_function(fn ->
    case fetch_data() do
      {:ok, items} -> {:items, items}
      {:error, reason} -> {:error, reason}
    end
  end)

  # If source errors, map sees {:error, reason} and propagates it
  mapped <- Brook.map(source, &process/1)

  # Final result is :ok or {:error, reason}
  Brook.run(mapped, &sink/1)
end

Summary

Functions

Execute a function for each item in the stream.

Filter items in the stream.

Create a stream from an enumerable.

Create a stream from a producer function.

Transform each item in the stream.

Run a stream to completion, applying a consumer function to each item.

Collect all items from a stream into a list.

Functions

each(input, consumer_fn)

@spec each(Skuld.Effects.Channel.Handle.t(), (term() -> any())) ::
  Skuld.Comp.Types.computation()

Execute a function for each item in the stream.

Returns :ok when the stream completes successfully, or {:error, reason} if an error occurred.

Example

comp do
  source <- Brook.from_enum(1..10)
  Brook.each(source, fn x -> IO.puts("Got: #{x}") end)
end

filter(input, pred_fn, opts \\ [])

@spec filter(Skuld.Effects.Channel.Handle.t(), (term() -> boolean()), keyword()) ::
  Skuld.Comp.Types.computation()

Filter items in the stream.

Only items for which the predicate returns true pass through. Filtering happens within chunks - chunks may shrink but are not rechunked.

Options

  • :buffer - Output channel capacity in chunks (default: 10)

Example

comp do
  source <- Brook.from_enum(1..20)
  evens <- Brook.filter(source, fn x -> rem(x, 2) == 0 end)
  Brook.to_list(evens)
end

from_enum(enumerable, opts \\ [])

@spec from_enum(
  Enumerable.t(),
  keyword()
) :: Skuld.Comp.Types.computation()

Create a stream from an enumerable.

Spawns a producer fiber that puts chunks of items into the output channel, then closes the channel when exhausted.

Options

  • :buffer - Output channel capacity in chunks (default: 10)
  • :chunk_size - Number of items per chunk (default: 100)

Example

comp do
  source <- Brook.from_enum(1..100)
  Brook.to_list(source)
end

from_function(producer_fn, opts \\ [])

@spec from_function(
  (-> {:item, term()} | {:items, [term()]} | :done | {:error, term()}),
  keyword()
) :: Skuld.Comp.Types.computation()

Create a stream from a producer function.

The producer function is called repeatedly until it signals completion. It should return:

  • {:item, value} - emit a single item
  • {:items, [values]} - emit multiple items (will be chunked)
  • :done - close the channel normally
  • {:error, reason} - signal error to consumers

Options

  • :buffer - Output channel capacity in chunks (default: 10)
  • :chunk_size - Number of items per chunk (default: 100)

Example

comp do
  counter = Agent.start_link(fn -> 0 end)

  source <- Brook.from_function(fn ->
    n = Agent.get_and_update(counter, fn n -> {n, n + 1} end)
    if n < 10, do: {:item, n}, else: :done
  end)

  Brook.to_list(source)
end

map(input, transform_fn, opts \\ [])

@spec map(
  Skuld.Effects.Channel.Handle.t(),
  (term() -> term() | Skuld.Comp.Types.computation()),
  keyword()
) :: Skuld.Comp.Types.computation()

Transform each item in the stream.

Spawns worker fiber(s) that read chunks from input, apply the transform function to each item in the chunk, and write result chunks to output.

The transform function can be a pure function or return a computation. If the transform errors on any item in a chunk, the stream is errored.

Options

  • :concurrency - Maximum concurrent transforms (default: 1). Due to cooperative scheduling, concurrency: 1 has a floor of 2 concurrent transforms. Values >= 2 behave as specified.
  • :buffer - Output channel capacity in chunks (default: 10)

Example

comp do
  source <- Brook.from_enum(1..10)
  doubled <- Brook.map(source, fn x -> x * 2 end)
  Brook.to_list(doubled)
end

With Effects

Transform functions can use effects, enabling batching:

comp do
  source <- Brook.from_enum(user_ids)
  users <- Brook.map(source, fn id -> DB.fetch(User, id) end, concurrency: 10)
  Brook.to_list(users)
end
|> DB.with_executors()

run(input, consumer_fn)

@spec run(Skuld.Effects.Channel.Handle.t(), (term() ->
                                         term() | Skuld.Comp.Types.computation())) ::
  Skuld.Comp.Types.computation()

Run a stream to completion, applying a consumer function to each item.

Similar to each/2 but the consumer function can return a computation. Returns :ok on success or {:error, reason} on failure.

Example

comp do
  source <- Brook.from_enum(records)
  Brook.run(source, fn record -> DB.insert(record) end)
end

to_list(input)

@spec to_list(Skuld.Effects.Channel.Handle.t()) :: Skuld.Comp.Types.computation()

Collect all items from a stream into a list.

Returns the list on success, or {:error, reason} on failure.

Example

comp do
  source <- Brook.from_enum(1..10)
  mapped <- Brook.map(source, fn x -> x * 2 end)
  Brook.to_list(mapped)
end