Streaming CSV parser for processing large files with bounded memory.
This module provides a streaming interface to the Rust-based streaming parser (Strategy D). It reads data in chunks and yields complete rows as they become available.
Memory Behavior
The streaming parser maintains a small buffer for partial rows. Memory usage is bounded by:
chunk_size- bytes per IO read operationbatch_size- rows held before yielding- Maximum single row size in your data
Usage
For most use cases, use the high-level parse_stream/2 function from
your CSV module:
alias RustyCSV.RFC4180, as: CSV
"data.csv"
|> File.stream!()
|> CSV.parse_stream()
|> Enum.each(&process_row/1)Direct Usage
For more control, you can use this module directly:
# Stream a file row by row
RustyCSV.Streaming.stream_file("data.csv")
|> Enum.each(&process_row/1)
# Stream with custom chunk size
RustyCSV.Streaming.stream_file("data.csv", chunk_size: 1024 * 1024)
|> Enum.to_list()
# Stream from an already-open device
File.open!("data.csv", [:read, :binary], fn device ->
RustyCSV.Streaming.stream_device(device)
|> Enum.each(&IO.inspect/1)
end)Encoding Support
The streaming functions support character encoding conversion via the
:encoding option. When a non-UTF8 encoding is specified, the stream
is automatically converted to UTF-8 before parsing, with proper handling
of multi-byte character boundaries across chunks.
Buffer Limit
The streaming parser enforces a maximum buffer size (default 256 MB) to
prevent unbounded memory growth when parsing data without newlines or with
very long rows. If a streaming_feed/2 call would push the buffer past
this limit, it raises a :buffer_overflow exception.
To adjust the limit, pass the :max_buffer_size option (in bytes) to any
streaming function:
RustyCSV.Streaming.stream_file("huge.csv", max_buffer_size: 512 * 1024 * 1024)Or from a defined parser:
CSV.parse_stream(stream, max_buffer_size: 512 * 1024 * 1024)Concurrency
Streaming parser references are safe to share across BEAM processes — the underlying Rust state is protected by a mutex. Concurrent access is serialized, so for maximum throughput use one parser per process.
If a NIF panics while holding the mutex, the lock is poisoned and subsequent
calls raise :mutex_poisoned. The parser should be discarded in this case.
Scheduling
streaming_feed/2, streaming_next_rows/2, and streaming_finalize/1 run
on dirty CPU schedulers to avoid blocking normal BEAM schedulers.
Implementation Notes
The streaming parser:
- Handles quoted fields that span multiple chunks correctly
- Preserves quote state across chunk boundaries
- Handles multi-byte character boundaries for non-UTF8 encodings
- Compacts internal buffer to prevent unbounded growth
- Enforces a configurable maximum buffer size (default 256 MB)
- Returns owned data (copies bytes) since input chunks are temporary
Summary
Functions
Parse binary chunks and return all rows.
Stream from an already-open IO device.
Create a stream from an enumerable (like File.stream!/1).
Create a stream that reads a CSV file in chunks.
Types
@type row() :: [binary()]
A parsed row (list of field binaries)
@type stream_options() :: [ chunk_size: pos_integer(), batch_size: pos_integer(), separator: binary() | non_neg_integer() | [binary()], escape: binary() | non_neg_integer(), newlines: :default | [binary()], encoding: RustyCSV.encoding(), bom: binary(), trim_bom: boolean(), max_buffer_size: pos_integer() ]
Options for streaming functions.
The :separator option accepts a binary (e.g., <<?,>> or ",") or an
integer byte (e.g., ?, or 9). When called from a module defined via
RustyCSV.define/2, the separator is already normalized to a binary.
The :max_buffer_size option sets the maximum internal buffer size in bytes.
Defaults to 268_435_456 (256 MB). If a streaming_feed/2 call would push
the buffer past this limit, a :buffer_overflow exception is raised. Increase
this if your data contains rows longer than 256 MB, or decrease it to fail
faster on malformed input.
Functions
Parse binary chunks and return all rows.
This is mainly useful for testing the streaming parser with in-memory data.
For actual streaming use cases, use stream_file/2 or stream_enumerable/2.
Options
:separator- Field separator. Accepts an integer byte (e.g.,9for tab), a binary (e.g.,"\t","::"), or a list of binaries (e.g.,[",", ";"]). Defaults to",".:escape- Escape/quote sequence. Accepts an integer byte (e.g.,34) or a binary (e.g.,""","$$"). Defaults to"(34).:max_buffer_size- Maximum internal buffer in bytes. Defaults to268_435_456(256 MB). Raises:buffer_overflowif exceeded.
Examples
RustyCSV.Streaming.parse_chunks(["a,b\n1,", "2\n3,4\n"])
#=> [["a", "b"], ["1", "2"], ["3", "4"]]
# TSV parsing (integer or binary separator)
RustyCSV.Streaming.parse_chunks(["a\tb\n1\t2\n"], separator: 9)
RustyCSV.Streaming.parse_chunks(["a\tb\n1\t2\n"], separator: "\t")
#=> [["a", "b"], ["1", "2"]]
@spec stream_device(IO.device(), stream_options()) :: Enumerable.t()
Stream from an already-open IO device.
Useful when you want more control over file opening/closing, or when reading from a socket or other IO device.
Note: This function does NOT close the device when done. The caller is responsible for closing it.
Options
:chunk_size- Bytes to read per IO operation. Defaults to65536.:batch_size- Maximum rows to yield per iteration. Defaults to1000.:max_buffer_size- Maximum internal buffer in bytes. Defaults to268_435_456(256 MB). Raises:buffer_overflowif exceeded.
Examples
File.open!("data.csv", [:read, :binary], fn device ->
RustyCSV.Streaming.stream_device(device)
|> Enum.each(&IO.inspect/1)
end)
@spec stream_enumerable(Enumerable.t(), stream_options()) :: Enumerable.t()
Create a stream from an enumerable (like File.stream!/1).
This is used internally by parse_stream/2 to handle line-oriented or
chunk-oriented input from any enumerable source.
Options
:chunk_size- Not used for enumerables (chunks come from source).:batch_size- Maximum rows to yield per iteration. Defaults to1000.:encoding- Character encoding of input. Defaults to:utf8.:bom- BOM to strip if:trim_bomis true. Defaults to"".:trim_bom- Whether to strip BOM from start. Defaults tofalse.:max_buffer_size- Maximum internal buffer in bytes. Defaults to268_435_456(256 MB). Raises:buffer_overflowif exceeded.
Examples
# Parse from a list of chunks
["name,age\n", "john,27\n", "jane,30\n"]
|> RustyCSV.Streaming.stream_enumerable()
|> Enum.to_list()
#=> [["name", "age"], ["john", "27"], ["jane", "30"]]
# Parse from File.stream!
File.stream!("data.csv")
|> RustyCSV.Streaming.stream_enumerable()
|> Enum.each(&process/1)
@spec stream_file(Path.t(), stream_options()) :: Enumerable.t()
Create a stream that reads a CSV file in chunks.
Opens the file, creates a streaming parser, and returns a Stream that
yields rows as they are parsed. The file is automatically closed when
the stream is consumed or halted.
Options
:chunk_size- Bytes to read per IO operation. Defaults to65536(64 KB). Larger chunks mean fewer IO operations but more memory per read.:batch_size- Maximum rows to yield per stream iteration. Defaults to1000. Larger batches are more efficient but delay processing of early rows.:max_buffer_size- Maximum internal buffer in bytes. Defaults to268_435_456(256 MB). Raises:buffer_overflowif exceeded.
Returns
A Stream that yields rows. Each row is a list of field binaries.
Examples
# Process a file row by row
RustyCSV.Streaming.stream_file("data.csv")
|> Enum.each(fn row ->
IO.inspect(row)
end)
# Take first 5 rows
RustyCSV.Streaming.stream_file("data.csv")
|> Enum.take(5)
# With custom options
RustyCSV.Streaming.stream_file("huge.csv",
chunk_size: 1024 * 1024, # 1 MB chunks
batch_size: 5000
)
|> Stream.map(&process_row/1)
|> Stream.run()