Selecto.Output.Transformers.Stream (Selecto v0.3.12)
Transforms query results to streaming format for memory-efficient processing of large datasets.
This module provides lazy evaluation of result transformations, allowing processing of large result sets without loading all data into memory at once.
features
Features
- Lazy evaluation via Elixir Streams
- Configurable batch sizes for optimization
- Composition with other transformers (maps, JSON, CSV)
- Memory-efficient processing for large datasets
- Backpressure support for downstream consumers
examples
Examples
# Stream rows as maps
{:ok, stream} = transform(rows, columns, aliases, :maps)
Enum.each(stream, &process_row/1)
# Stream to JSON Lines format
{:ok, stream} = transform(rows, columns, aliases, {:json, format: :lines})
Enum.into(stream, file_stream)
# Stream CSV rows
{:ok, stream} = transform(rows, columns, aliases, :csv)
Enum.into(stream, File.stream!("output.csv"))
Link to this section Summary
Functions
Transform rows to a stream with the specified inner format.
Create a stream that yields results in chunks, useful for pagination or batch processing.
Create a streaming transformation that processes rows one at a time.
Transform and write results directly to an IO device or file stream.
Link to this section Functions
transform(rows, columns, aliases, inner_format, options \\ [])
@spec transform(list() | Enumerable.t(), list(), map(), atom() | tuple(), keyword()) :: {:ok, Enumerable.t()} | {:error, term()}
Transform rows to a stream with the specified inner format.
parameters
Parameters
rows- List of row data or stream of rowscolumns- List of column namesaliases- Map of column aliasesinner_format- The format to transform each row/batch tooptions- Transformation options
options
Options
:batch_size- Number of rows to process in each batch. Default: 1000:parallel- Whether to process batches in parallel. Default: false- Other options are passed to the inner transformer
inner-format-support
Inner Format Support
:maps- Stream of maps{:maps, opts}- Stream of maps with options:json- Stream of JSON strings (JSON Lines format by default){:json, opts}- Stream of JSON with options:csv- Stream of CSV lines{:csv, opts}- Stream of CSV with options:raw- Stream of raw row lists
examples
Examples
# Basic streaming to maps
{:ok, stream} = transform(rows, columns, aliases, :maps)
# Streaming with custom batch size
{:ok, stream} = transform(rows, columns, aliases, :maps, batch_size: 500)
# Streaming to JSON Lines
{:ok, stream} = transform(rows, columns, aliases, {:json, format: :lines})
transform_chunked(rows, columns, aliases, inner_format, options \\ [])
@spec transform_chunked(Enumerable.t(), list(), map(), atom() | tuple(), keyword()) :: {:ok, Enumerable.t()} | {:error, term()}
Create a stream that yields results in chunks, useful for pagination or batch processing.
examples
Examples
{:ok, chunked_stream} = transform_chunked(rows, columns, aliases, :maps, chunk_size: 100)
Enum.each(chunked_stream, fn chunk ->
# Process 100 maps at a time
process_batch(chunk)
end)
transform_single(rows, columns, aliases, inner_format, options \\ [])
@spec transform_single(Enumerable.t(), list(), map(), atom() | tuple(), keyword()) :: {:ok, Enumerable.t()} | {:error, term()}
Create a streaming transformation that processes rows one at a time.
This is more memory efficient for simple transformations but may be slower for complex transformations that benefit from batching.
transform_to_io(rows, columns, aliases, inner_format, io_device, options \\ [])
@spec transform_to_io( Enumerable.t(), list(), map(), atom() | tuple(), IO.device(), keyword() ) :: :ok | {:error, term()}
Transform and write results directly to an IO device or file stream.
This is useful for writing large result sets directly to files without buffering all data in memory.
examples
Examples
# Write to file
File.open!("output.csv", [:write], fn file ->
transform_to_io(rows, columns, aliases, :csv, file)
end)