Forge.Runner.Streaming (Forge v0.1.1)

View Source

Streaming runner with backpressure and async stage support.

Implements lazy evaluation with Task.async_stream for memory-efficient processing of large datasets.

Features

  • Lazy sample generation (pull model)
  • Backpressure via bounded Task mailboxes
  • Async I/O-bound stages with concurrency control
  • Memory bounded to O(concurrency), not O(dataset_size)

Usage

# Returns a stream (lazy evaluation)
stream = Forge.Runner.Streaming.run(pipeline, concurrency: 10)

# Consume with backpressure
samples = stream |> Enum.take(1000) |> Enum.to_list()

# Or process lazily
stream
|> Stream.each(&process_sample/1)
|> Stream.run()

Summary

Functions

Run pipeline and return a stream of processed samples.

Functions

run(pipeline, opts \\ [])

Run pipeline and return a stream of processed samples.

Options

  • :concurrency - Max concurrent async tasks (default: System.schedulers_online())
  • :storage - Storage adapter module (default: nil, no persistence)
  • :storage_opts - Options for storage adapter