Data Pipelines

View Source

< Durable Workflows | Up: Patterns & Recipes | Index | Batch Data Loading >

Brook provides streaming data pipelines with backpressure, concurrent transforms, and automatic I/O batching. It runs within a single BEAM process using cooperative fibers, making it lightweight and fast for I/O-bound workloads.

Basic pipeline

comp do
  source <- Brook.from_enum(1..1000)
  mapped <- Brook.map(source, &transform/1)
  filtered <- Brook.filter(mapped, &valid?/1)
  Brook.to_list(filtered)
end
|> Channel.with_handler()
|> FiberPool.with_handler()
|> Comp.run!()

Concurrent transforms

Add concurrency to any stage with the concurrency: option:

comp do
  source <- Brook.from_enum(items)
  enriched <- Brook.map(source, &enrich/1, concurrency: 4)
  Brook.to_list(enriched)
end
|> Channel.with_handler()
|> FiberPool.with_handler()
|> Comp.run!()

With concurrency: 4, up to 4 chunks process simultaneously. Order is preserved regardless of which chunk finishes first.

Streaming from external sources

Use from_function for sources that produce data incrementally:

comp do
  source <- Brook.from_function(fn ->
    case ExternalAPI.fetch_page(cursor) do
      {:ok, %{items: items, next: nil}} -> {:items, items}
      {:ok, %{items: items, next: next}} ->
        Process.put(:cursor, next)
        {:items, items}
      {:error, e} -> {:error, e}
    end
  end, chunk_size: 50)

  Brook.each(source, fn item ->
    comp do
      _ <- RecordRepo.insert_record!(item)
      :ok
    end
  end)
end
|> Port.with_handler(%{RecordRepo => RecordRepo.Ecto})
|> Channel.with_handler()
|> FiberPool.with_handler()
|> Comp.run!()

ETL pipeline with I/O batching

Combine Brook with Query contracts for automatic batching across pipeline stages:

defmodule ETL.Queries do
  use Skuld.Query.Contract
  deffetch lookup_customer(ref :: String.t()) :: Customer.t() | nil
  deffetch lookup_product(sku :: String.t()) :: Product.t() | nil
end

defcomp enrich_order(raw_order) do
  customer <- ETL.Queries.lookup_customer(raw_order.customer_ref)
  product <- ETL.Queries.lookup_product(raw_order.product_sku)
  %{
    order: raw_order,
    customer: customer,
    product: product,
    total: raw_order.quantity * (product && product.price || 0)
  }
end

comp do
  # Stream raw orders
  source <- Brook.from_enum(raw_orders, chunk_size: 10)

  # Enrich with concurrent lookups - queries batch automatically
  enriched <- Brook.map(source, &enrich_order/1, concurrency: 5)

  # Filter invalid
  valid <- Brook.filter(enriched, fn o -> o.customer != nil end)

  # Persist
  Brook.each(valid, fn order ->
    comp do
      _ <- OrderRepo.insert_enriched_order!(order)
      :ok
    end
  end)
end
|> ETL.Queries.with_executor(ETL.Queries.EctoExecutor)
|> Port.with_handler(%{OrderRepo => OrderRepo.Ecto})
|> Channel.with_handler()
|> FiberPool.with_handler()
|> Comp.run!()

With chunk_size: 10 and concurrency: 5, up to 50 orders are in flight at once. All lookup_customer calls across those 50 orders batch into a single executor invocation, and all lookup_product calls batch into another.

Backpressure control

Each stage uses a bounded channel buffer. Configure with buffer::

Brook.map(source, &slow_transform/1, buffer: 5, concurrency: 3)

When the consumer can't keep up, the buffer fills, and the producer blocks. This prevents memory from growing unboundedly.

Default buffer size is 10. For CPU-bound transforms, larger buffers can smooth out variance. For memory-heavy items, smaller buffers keep footprint low.

Side effects per item

Use Brook.each for fire-and-forget side effects:

Brook.each(stream, fn item ->
  comp do
    _ <- Writer.tell(%{processed: item.id})
    :ok
  end
end)

Use Brook.run when you need to consume items with a callback:

Brook.run(stream, fn item ->
  comp do
      _ <- ItemRepo.insert_item!(item)
    :ok
  end
end)

When to use Brook vs GenStage

  • Brook - I/O-bound pipelines, automatic query batching, order preservation, lightweight (no process overhead)
  • GenStage/Flow - CPU-bound parallel processing across cores, long-lived production pipelines, built-in rate limiting
  • Enum/Stream - Simple sequential processing, no concurrency needed

Brook shines when your pipeline spends most of its time waiting on I/O (database queries, API calls) and can benefit from batching those calls across concurrent items.


< Durable Workflows | Up: Patterns & Recipes | Index | Batch Data Loading >