# `RustyCSV.Streaming`
[🔗](https://github.com/jeffhuen/rustycsv/blob/v0.3.10/lib/rusty_csv/streaming.ex#L1)

Streaming CSV parser for processing large files with bounded memory.

This module provides a streaming interface to the Rust-based streaming
parser (Strategy D). It reads data in chunks and yields complete rows
as they become available.

## Memory Behavior

The streaming parser maintains a small buffer for partial rows. Memory
usage is bounded by:

  * `chunk_size` - bytes per IO read operation
  * `batch_size` - rows held before yielding
  * Maximum single row size in your data

## Usage

For most use cases, use the high-level `parse_stream/2` function from
your CSV module:

    alias RustyCSV.RFC4180, as: CSV

    "data.csv"
    |> File.stream!()
    |> CSV.parse_stream()
    |> Enum.each(&process_row/1)

## Direct Usage

For more control, you can use this module directly:

    # Stream a file row by row
    RustyCSV.Streaming.stream_file("data.csv")
    |> Enum.each(&process_row/1)

    # Stream with custom chunk size
    RustyCSV.Streaming.stream_file("data.csv", chunk_size: 1024 * 1024)
    |> Enum.to_list()

    # Stream from an already-open device
    File.open!("data.csv", [:read, :binary], fn device ->
      RustyCSV.Streaming.stream_device(device)
      |> Enum.each(&IO.inspect/1)
    end)

## Encoding Support

The streaming functions support character encoding conversion via the
`:encoding` option. When a non-UTF8 encoding is specified, the stream
is automatically converted to UTF-8 before parsing, with proper handling
of multi-byte character boundaries across chunks.

## Buffer Limit

The streaming parser enforces a maximum buffer size (default 256 MB) to
prevent unbounded memory growth when parsing data without newlines or with
very long rows. If a `streaming_feed/2` call would push the buffer past
this limit, it raises a `:buffer_overflow` exception.

To adjust the limit, pass the `:max_buffer_size` option (in bytes) to any
streaming function:

    RustyCSV.Streaming.stream_file("huge.csv", max_buffer_size: 512 * 1024 * 1024)

Or from a defined parser:

    CSV.parse_stream(stream, max_buffer_size: 512 * 1024 * 1024)

## Concurrency

Streaming parser references are safe to share across BEAM processes — the
underlying Rust state is protected by a mutex. Concurrent access is serialized,
so for maximum throughput use one parser per process.

If a NIF panics while holding the mutex, the lock is poisoned and subsequent
calls raise `:mutex_poisoned`. The parser should be discarded in this case.

## Scheduling

`streaming_feed/2`, `streaming_next_rows/2`, and `streaming_finalize/1` run
on dirty CPU schedulers to avoid blocking normal BEAM schedulers.

## Implementation Notes

The streaming parser:

  * Handles quoted fields that span multiple chunks correctly
  * Preserves quote state across chunk boundaries
  * Handles multi-byte character boundaries for non-UTF8 encodings
  * Compacts internal buffer to prevent unbounded growth
  * Enforces a configurable maximum buffer size (default 256 MB)
  * Returns owned data (copies bytes) since input chunks are temporary

# `row`
[🔗](https://github.com/jeffhuen/rustycsv/blob/v0.3.10/lib/rusty_csv/streaming.ex#L103)

```elixir
@type row() :: [binary()]
```

A parsed row (list of field binaries)

# `stream_options`
[🔗](https://github.com/jeffhuen/rustycsv/blob/v0.3.10/lib/rusty_csv/streaming.ex#L118)

```elixir
@type stream_options() :: [
  chunk_size: pos_integer(),
  batch_size: pos_integer(),
  separator: binary() | non_neg_integer() | [binary()],
  escape: binary() | non_neg_integer(),
  newlines: :default | [binary()],
  encoding: RustyCSV.encoding(),
  bom: binary(),
  trim_bom: boolean(),
  max_buffer_size: pos_integer()
]
```

Options for streaming functions.

The `:separator` option accepts a binary (e.g., `<<?,>>` or `","`) or an
integer byte (e.g., `?,` or `9`). When called from a module defined via
`RustyCSV.define/2`, the separator is already normalized to a binary.

The `:max_buffer_size` option sets the maximum internal buffer size in bytes.
Defaults to `268_435_456` (256 MB). If a `streaming_feed/2` call would push
the buffer past this limit, a `:buffer_overflow` exception is raised. Increase
this if your data contains rows longer than 256 MB, or decrease it to fail
faster on malformed input.

# `parse_chunks`
[🔗](https://github.com/jeffhuen/rustycsv/blob/v0.3.10/lib/rusty_csv/streaming.ex#L414)

```elixir
@spec parse_chunks(
  [binary()],
  keyword()
) :: [row()]
```

Parse binary chunks and return all rows.

This is mainly useful for testing the streaming parser with in-memory data.
For actual streaming use cases, use `stream_file/2` or `stream_enumerable/2`.

## Options

  * `:separator` - Field separator. Accepts an integer byte (e.g., `9` for tab),
    a binary (e.g., `"\t"`, `"::"`), or a list of binaries (e.g., `[",", ";"]`).
    Defaults to `","`.
  * `:escape` - Escape/quote sequence. Accepts an integer byte (e.g., `34`) or
    a binary (e.g., `"""`, `"$$"`). Defaults to `"` (34).
  * `:max_buffer_size` - Maximum internal buffer in bytes. Defaults to
    `268_435_456` (256 MB). Raises `:buffer_overflow` if exceeded.

## Examples

    RustyCSV.Streaming.parse_chunks(["a,b\n1,", "2\n3,4\n"])
    #=> [["a", "b"], ["1", "2"], ["3", "4"]]

    # TSV parsing (integer or binary separator)
    RustyCSV.Streaming.parse_chunks(["a\tb\n1\t2\n"], separator: 9)
    RustyCSV.Streaming.parse_chunks(["a\tb\n1\t2\n"], separator: "\t")
    #=> [["a", "b"], ["1", "2"]]

# `stream_device`
[🔗](https://github.com/jeffhuen/rustycsv/blob/v0.3.10/lib/rusty_csv/streaming.ex#L370)

```elixir
@spec stream_device(IO.device(), stream_options()) :: Enumerable.t()
```

Stream from an already-open IO device.

Useful when you want more control over file opening/closing, or when
reading from a socket or other IO device.

Note: This function does NOT close the device when done. The caller
is responsible for closing it.

## Options

  * `:chunk_size` - Bytes to read per IO operation. Defaults to `65536`.

  * `:batch_size` - Maximum rows to yield per iteration. Defaults to `1000`.

  * `:max_buffer_size` - Maximum internal buffer in bytes. Defaults to
    `268_435_456` (256 MB). Raises `:buffer_overflow` if exceeded.

## Examples

    File.open!("data.csv", [:read, :binary], fn device ->
      RustyCSV.Streaming.stream_device(device)
      |> Enum.each(&IO.inspect/1)
    end)

# `stream_enumerable`
[🔗](https://github.com/jeffhuen/rustycsv/blob/v0.3.10/lib/rusty_csv/streaming.ex#L236)

```elixir
@spec stream_enumerable(Enumerable.t(), stream_options()) :: Enumerable.t()
```

Create a stream from an enumerable (like `File.stream!/1`).

This is used internally by `parse_stream/2` to handle line-oriented or
chunk-oriented input from any enumerable source.

## Options

  * `:chunk_size` - Not used for enumerables (chunks come from source).

  * `:batch_size` - Maximum rows to yield per iteration. Defaults to `1000`.

  * `:encoding` - Character encoding of input. Defaults to `:utf8`.

  * `:bom` - BOM to strip if `:trim_bom` is true. Defaults to `""`.

  * `:trim_bom` - Whether to strip BOM from start. Defaults to `false`.

  * `:max_buffer_size` - Maximum internal buffer in bytes. Defaults to
    `268_435_456` (256 MB). Raises `:buffer_overflow` if exceeded.

## Examples

    # Parse from a list of chunks
    ["name,age\n", "john,27\n", "jane,30\n"]
    |> RustyCSV.Streaming.stream_enumerable()
    |> Enum.to_list()
    #=> [["name", "age"], ["john", "27"], ["jane", "30"]]

    # Parse from File.stream!
    File.stream!("data.csv")
    |> RustyCSV.Streaming.stream_enumerable()
    |> Enum.each(&process/1)

# `stream_file`
[🔗](https://github.com/jeffhuen/rustycsv/blob/v0.3.10/lib/rusty_csv/streaming.ex#L186)

```elixir
@spec stream_file(Path.t(), stream_options()) :: Enumerable.t()
```

Create a stream that reads a CSV file in chunks.

Opens the file, creates a streaming parser, and returns a `Stream` that
yields rows as they are parsed. The file is automatically closed when
the stream is consumed or halted.

## Options

  * `:chunk_size` - Bytes to read per IO operation. Defaults to `65536` (64 KB).
    Larger chunks mean fewer IO operations but more memory per read.

  * `:batch_size` - Maximum rows to yield per stream iteration. Defaults to `1000`.
    Larger batches are more efficient but delay processing of early rows.

  * `:max_buffer_size` - Maximum internal buffer in bytes. Defaults to
    `268_435_456` (256 MB). Raises `:buffer_overflow` if exceeded.

## Returns

A `Stream` that yields rows. Each row is a list of field binaries.

## Examples

    # Process a file row by row
    RustyCSV.Streaming.stream_file("data.csv")
    |> Enum.each(fn row ->
      IO.inspect(row)
    end)

    # Take first 5 rows
    RustyCSV.Streaming.stream_file("data.csv")
    |> Enum.take(5)

    # With custom options
    RustyCSV.Streaming.stream_file("huge.csv",
      chunk_size: 1024 * 1024,  # 1 MB chunks
      batch_size: 5000
    )
    |> Stream.map(&process_row/1)
    |> Stream.run()

---

*Consult [api-reference.md](api-reference.md) for complete listing*
