RustyCSV.Streaming (RustyCSV v0.3.10)

Copy Markdown View Source

Streaming CSV parser for processing large files with bounded memory.

This module provides a streaming interface to the Rust-based streaming parser (Strategy D). It reads data in chunks and yields complete rows as they become available.

Memory Behavior

The streaming parser maintains a small buffer for partial rows. Memory usage is bounded by:

  • chunk_size - bytes per IO read operation
  • batch_size - rows held before yielding
  • Maximum single row size in your data

Usage

For most use cases, use the high-level parse_stream/2 function from your CSV module:

alias RustyCSV.RFC4180, as: CSV

"data.csv"
|> File.stream!()
|> CSV.parse_stream()
|> Enum.each(&process_row/1)

Direct Usage

For more control, you can use this module directly:

# Stream a file row by row
RustyCSV.Streaming.stream_file("data.csv")
|> Enum.each(&process_row/1)

# Stream with custom chunk size
RustyCSV.Streaming.stream_file("data.csv", chunk_size: 1024 * 1024)
|> Enum.to_list()

# Stream from an already-open device
File.open!("data.csv", [:read, :binary], fn device ->
  RustyCSV.Streaming.stream_device(device)
  |> Enum.each(&IO.inspect/1)
end)

Encoding Support

The streaming functions support character encoding conversion via the :encoding option. When a non-UTF8 encoding is specified, the stream is automatically converted to UTF-8 before parsing, with proper handling of multi-byte character boundaries across chunks.

Buffer Limit

The streaming parser enforces a maximum buffer size (default 256 MB) to prevent unbounded memory growth when parsing data without newlines or with very long rows. If a streaming_feed/2 call would push the buffer past this limit, it raises a :buffer_overflow exception.

To adjust the limit, pass the :max_buffer_size option (in bytes) to any streaming function:

RustyCSV.Streaming.stream_file("huge.csv", max_buffer_size: 512 * 1024 * 1024)

Or from a defined parser:

CSV.parse_stream(stream, max_buffer_size: 512 * 1024 * 1024)

Concurrency

Streaming parser references are safe to share across BEAM processes — the underlying Rust state is protected by a mutex. Concurrent access is serialized, so for maximum throughput use one parser per process.

If a NIF panics while holding the mutex, the lock is poisoned and subsequent calls raise :mutex_poisoned. The parser should be discarded in this case.

Scheduling

streaming_feed/2, streaming_next_rows/2, and streaming_finalize/1 run on dirty CPU schedulers to avoid blocking normal BEAM schedulers.

Implementation Notes

The streaming parser:

  • Handles quoted fields that span multiple chunks correctly
  • Preserves quote state across chunk boundaries
  • Handles multi-byte character boundaries for non-UTF8 encodings
  • Compacts internal buffer to prevent unbounded growth
  • Enforces a configurable maximum buffer size (default 256 MB)
  • Returns owned data (copies bytes) since input chunks are temporary

Summary

Types

A parsed row (list of field binaries)

Options for streaming functions.

Functions

Parse binary chunks and return all rows.

Stream from an already-open IO device.

Create a stream from an enumerable (like File.stream!/1).

Create a stream that reads a CSV file in chunks.

Types

row()

@type row() :: [binary()]

A parsed row (list of field binaries)

stream_options()

@type stream_options() :: [
  chunk_size: pos_integer(),
  batch_size: pos_integer(),
  separator: binary() | non_neg_integer() | [binary()],
  escape: binary() | non_neg_integer(),
  newlines: :default | [binary()],
  encoding: RustyCSV.encoding(),
  bom: binary(),
  trim_bom: boolean(),
  max_buffer_size: pos_integer()
]

Options for streaming functions.

The :separator option accepts a binary (e.g., <<?,>> or ",") or an integer byte (e.g., ?, or 9). When called from a module defined via RustyCSV.define/2, the separator is already normalized to a binary.

The :max_buffer_size option sets the maximum internal buffer size in bytes. Defaults to 268_435_456 (256 MB). If a streaming_feed/2 call would push the buffer past this limit, a :buffer_overflow exception is raised. Increase this if your data contains rows longer than 256 MB, or decrease it to fail faster on malformed input.

Functions

parse_chunks(chunks, opts \\ [])

@spec parse_chunks(
  [binary()],
  keyword()
) :: [row()]

Parse binary chunks and return all rows.

This is mainly useful for testing the streaming parser with in-memory data. For actual streaming use cases, use stream_file/2 or stream_enumerable/2.

Options

  • :separator - Field separator. Accepts an integer byte (e.g., 9 for tab), a binary (e.g., "\t", "::"), or a list of binaries (e.g., [",", ";"]). Defaults to ",".
  • :escape - Escape/quote sequence. Accepts an integer byte (e.g., 34) or a binary (e.g., """, "$$"). Defaults to " (34).
  • :max_buffer_size - Maximum internal buffer in bytes. Defaults to 268_435_456 (256 MB). Raises :buffer_overflow if exceeded.

Examples

RustyCSV.Streaming.parse_chunks(["a,b\n1,", "2\n3,4\n"])
#=> [["a", "b"], ["1", "2"], ["3", "4"]]

# TSV parsing (integer or binary separator)
RustyCSV.Streaming.parse_chunks(["a\tb\n1\t2\n"], separator: 9)
RustyCSV.Streaming.parse_chunks(["a\tb\n1\t2\n"], separator: "\t")
#=> [["a", "b"], ["1", "2"]]

stream_device(device, opts \\ [])

@spec stream_device(IO.device(), stream_options()) :: Enumerable.t()

Stream from an already-open IO device.

Useful when you want more control over file opening/closing, or when reading from a socket or other IO device.

Note: This function does NOT close the device when done. The caller is responsible for closing it.

Options

  • :chunk_size - Bytes to read per IO operation. Defaults to 65536.

  • :batch_size - Maximum rows to yield per iteration. Defaults to 1000.

  • :max_buffer_size - Maximum internal buffer in bytes. Defaults to 268_435_456 (256 MB). Raises :buffer_overflow if exceeded.

Examples

File.open!("data.csv", [:read, :binary], fn device ->
  RustyCSV.Streaming.stream_device(device)
  |> Enum.each(&IO.inspect/1)
end)

stream_enumerable(enumerable, opts \\ [])

@spec stream_enumerable(Enumerable.t(), stream_options()) :: Enumerable.t()

Create a stream from an enumerable (like File.stream!/1).

This is used internally by parse_stream/2 to handle line-oriented or chunk-oriented input from any enumerable source.

Options

  • :chunk_size - Not used for enumerables (chunks come from source).

  • :batch_size - Maximum rows to yield per iteration. Defaults to 1000.

  • :encoding - Character encoding of input. Defaults to :utf8.

  • :bom - BOM to strip if :trim_bom is true. Defaults to "".

  • :trim_bom - Whether to strip BOM from start. Defaults to false.

  • :max_buffer_size - Maximum internal buffer in bytes. Defaults to 268_435_456 (256 MB). Raises :buffer_overflow if exceeded.

Examples

# Parse from a list of chunks
["name,age\n", "john,27\n", "jane,30\n"]
|> RustyCSV.Streaming.stream_enumerable()
|> Enum.to_list()
#=> [["name", "age"], ["john", "27"], ["jane", "30"]]

# Parse from File.stream!
File.stream!("data.csv")
|> RustyCSV.Streaming.stream_enumerable()
|> Enum.each(&process/1)

stream_file(path, opts \\ [])

@spec stream_file(Path.t(), stream_options()) :: Enumerable.t()

Create a stream that reads a CSV file in chunks.

Opens the file, creates a streaming parser, and returns a Stream that yields rows as they are parsed. The file is automatically closed when the stream is consumed or halted.

Options

  • :chunk_size - Bytes to read per IO operation. Defaults to 65536 (64 KB). Larger chunks mean fewer IO operations but more memory per read.

  • :batch_size - Maximum rows to yield per stream iteration. Defaults to 1000. Larger batches are more efficient but delay processing of early rows.

  • :max_buffer_size - Maximum internal buffer in bytes. Defaults to 268_435_456 (256 MB). Raises :buffer_overflow if exceeded.

Returns

A Stream that yields rows. Each row is a list of field binaries.

Examples

# Process a file row by row
RustyCSV.Streaming.stream_file("data.csv")
|> Enum.each(fn row ->
  IO.inspect(row)
end)

# Take first 5 rows
RustyCSV.Streaming.stream_file("data.csv")
|> Enum.take(5)

# With custom options
RustyCSV.Streaming.stream_file("huge.csv",
  chunk_size: 1024 * 1024,  # 1 MB chunks
  batch_size: 5000
)
|> Stream.map(&process_row/1)
|> Stream.run()