Selecto.Output.Transformers.Stream (Selecto v0.3.12)

Transforms query results to streaming format for memory-efficient processing of large datasets.

This module provides lazy evaluation of result transformations, allowing processing of large result sets without loading all data into memory at once.

features

Features

  • Lazy evaluation via Elixir Streams
  • Configurable batch sizes for optimization
  • Composition with other transformers (maps, JSON, CSV)
  • Memory-efficient processing for large datasets
  • Backpressure support for downstream consumers

examples

Examples

# Stream rows as maps
{:ok, stream} = transform(rows, columns, aliases, :maps)
Enum.each(stream, &process_row/1)

# Stream to JSON Lines format
{:ok, stream} = transform(rows, columns, aliases, {:json, format: :lines})
Enum.into(stream, file_stream)

# Stream CSV rows
{:ok, stream} = transform(rows, columns, aliases, :csv)
Enum.into(stream, File.stream!("output.csv"))

Link to this section Summary

Functions

Transform rows to a stream with the specified inner format.

Create a stream that yields results in chunks, useful for pagination or batch processing.

Create a streaming transformation that processes rows one at a time.

Transform and write results directly to an IO device or file stream.

Link to this section Functions

Link to this function

transform(rows, columns, aliases, inner_format, options \\ [])

@spec transform(list() | Enumerable.t(), list(), map(), atom() | tuple(), keyword()) ::
  {:ok, Enumerable.t()} | {:error, term()}

Transform rows to a stream with the specified inner format.

parameters

Parameters

  • rows - List of row data or stream of rows
  • columns - List of column names
  • aliases - Map of column aliases
  • inner_format - The format to transform each row/batch to
  • options - Transformation options

options

Options

  • :batch_size - Number of rows to process in each batch. Default: 1000
  • :parallel - Whether to process batches in parallel. Default: false
  • Other options are passed to the inner transformer

inner-format-support

Inner Format Support

  • :maps - Stream of maps
  • {:maps, opts} - Stream of maps with options
  • :json - Stream of JSON strings (JSON Lines format by default)
  • {:json, opts} - Stream of JSON with options
  • :csv - Stream of CSV lines
  • {:csv, opts} - Stream of CSV with options
  • :raw - Stream of raw row lists

examples

Examples

# Basic streaming to maps
{:ok, stream} = transform(rows, columns, aliases, :maps)

# Streaming with custom batch size
{:ok, stream} = transform(rows, columns, aliases, :maps, batch_size: 500)

# Streaming to JSON Lines
{:ok, stream} = transform(rows, columns, aliases, {:json, format: :lines})
Link to this function

transform_chunked(rows, columns, aliases, inner_format, options \\ [])

@spec transform_chunked(Enumerable.t(), list(), map(), atom() | tuple(), keyword()) ::
  {:ok, Enumerable.t()} | {:error, term()}

Create a stream that yields results in chunks, useful for pagination or batch processing.

examples

Examples

{:ok, chunked_stream} = transform_chunked(rows, columns, aliases, :maps, chunk_size: 100)
Enum.each(chunked_stream, fn chunk ->
  # Process 100 maps at a time
  process_batch(chunk)
end)
Link to this function

transform_single(rows, columns, aliases, inner_format, options \\ [])

@spec transform_single(Enumerable.t(), list(), map(), atom() | tuple(), keyword()) ::
  {:ok, Enumerable.t()} | {:error, term()}

Create a streaming transformation that processes rows one at a time.

This is more memory efficient for simple transformations but may be slower for complex transformations that benefit from batching.

Link to this function

transform_to_io(rows, columns, aliases, inner_format, io_device, options \\ [])

@spec transform_to_io(
  Enumerable.t(),
  list(),
  map(),
  atom() | tuple(),
  IO.device(),
  keyword()
) ::
  :ok | {:error, term()}

Transform and write results directly to an IO device or file stream.

This is useful for writing large result sets directly to files without buffering all data in memory.

examples

Examples

# Write to file
File.open!("output.csv", [:write], fn file ->
  transform_to_io(rows, columns, aliases, :csv, file)
end)